This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 1d87923667 feat: Add `isCompleted` method to BoundedSourceQueue (#1374)
1d87923667 is described below
commit 1d87923667aab48fa954ac29bacf56d7ef13addc
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Jun 18 19:19:02 2024 +0800
feat: Add `isCompleted` method to BoundedSourceQueue (#1374)
---
.../stream/scaladsl/BoundedSourceQueueSpec.scala | 6 ++++++
...sourcequeue-iscompleted-classes.backwards.excludes | 19 +++++++++++++++++++
.../org/apache/pekko/stream/BoundedSourceQueue.scala | 7 +++++++
.../apache/pekko/stream/impl/BoundedSourceQueue.scala | 10 ++++++----
4 files changed, 38 insertions(+), 4 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
index 9989c81645..738e84d965 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
@@ -45,6 +45,7 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
}
queue.complete()
+ queue.isCompleted shouldBe true
val subIt = Iterator.continually(sub.requestNext())
subIt.zip(elements.iterator).foreach {
@@ -81,6 +82,7 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.complete()
+ queue.isCompleted shouldBe true
assertThrows[IllegalStateException](queue.complete())
}
@@ -89,6 +91,7 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.fail(ex)
+ queue.isCompleted shouldBe true
assertThrows[IllegalStateException](queue.fail(ex))
}
@@ -98,6 +101,7 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.complete()
+ queue.isCompleted shouldBe true
queue.offer(1) should be(QueueOfferResult.QueueClosed)
sub.expectSubscriptionAndComplete()
}
@@ -108,6 +112,7 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.fail(ex)
+ queue.isCompleted shouldBe true
queue.offer(1) should be(QueueOfferResult.Failure(ex))
sub.request(1)
sub.expectError(ex)
@@ -180,6 +185,7 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
// where enqueueing an element concurrently with Done reaching the stage
can lead to Enqueued being returned
// but the element dropped (no guarantee of entering stream as
documented in BoundedSourceQueue.offer
queue.complete()
+ queue.isCompleted shouldBe true
result.futureValue should be(counter.get())
}
diff --git
a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes
b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes
new file mode 100644
index 0000000000..b765986028
--- /dev/null
+++
b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Add BoundedSourceQueue.isCompleted method
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.BoundedSourceQueue.isCompleted")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala
b/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala
index d5b2377e6d..cf55dee581 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala
@@ -38,6 +38,13 @@ trait BoundedSourceQueue[T] {
*/
def complete(): Unit
+ /**
+ * Returns true if the stream has been completed, either normally or with
failure.
+ *
+ * @since 1.1.0
+ */
+ def isCompleted: Boolean
+
/**
* Completes the stream with a failure.
*/
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala
index a904145e8a..5fd37481ea 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala
@@ -114,7 +114,7 @@ import pekko.stream.stage.{ GraphStageLogic,
GraphStageWithMaterializedValue, Ou
}
object Mat extends BoundedSourceQueue[T] {
- override def offer(elem: T): QueueOfferResult = state.get() match {
+ final override def offer(elem: T): QueueOfferResult = state.get() match {
case Running | NeedsActivation =>
if (queue.add(elem)) {
// need to query state again because stage might have switched
from Running -> NeedsActivation only after
@@ -130,21 +130,23 @@ import pekko.stream.stage.{ GraphStageLogic,
GraphStageWithMaterializedValue, Ou
case Done(result) => result
}
- override def complete(): Unit = {
+ final override def complete(): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been
completed.")
if (setDone(Done(QueueOfferResult.QueueClosed)))
Logic.callback.invoke(()) // if this thread won the completion race
also schedule an async callback
}
- override def fail(ex: Throwable): Unit = {
+ final override def isCompleted: Boolean = state.get().isInstanceOf[Done]
+
+ final override def fail(ex: Throwable): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been
completed.")
if (setDone(Done(QueueOfferResult.Failure(ex))))
Logic.callback.invoke(()) // if this thread won the completion race
also schedule an async callback
}
- override def size(): Int = queue.size()
+ final override def size(): Int = queue.size()
}
// some state transition helpers
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]