This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d87923667 feat: Add `isCompleted` method to BoundedSourceQueue (#1374)
1d87923667 is described below

commit 1d87923667aab48fa954ac29bacf56d7ef13addc
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Jun 18 19:19:02 2024 +0800

    feat: Add `isCompleted` method to BoundedSourceQueue (#1374)
---
 .../stream/scaladsl/BoundedSourceQueueSpec.scala      |  6 ++++++
 ...sourcequeue-iscompleted-classes.backwards.excludes | 19 +++++++++++++++++++
 .../org/apache/pekko/stream/BoundedSourceQueue.scala  |  7 +++++++
 .../apache/pekko/stream/impl/BoundedSourceQueue.scala | 10 ++++++----
 4 files changed, 38 insertions(+), 4 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
index 9989c81645..738e84d965 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
@@ -45,6 +45,7 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
       }
 
       queue.complete()
+      queue.isCompleted shouldBe true
 
       val subIt = Iterator.continually(sub.requestNext())
       subIt.zip(elements.iterator).foreach {
@@ -81,6 +82,7 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
       val queue =
         Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
       queue.complete()
+      queue.isCompleted shouldBe true
       assertThrows[IllegalStateException](queue.complete())
     }
 
@@ -89,6 +91,7 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
       val queue =
         Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
       queue.fail(ex)
+      queue.isCompleted shouldBe true
       assertThrows[IllegalStateException](queue.fail(ex))
     }
 
@@ -98,6 +101,7 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
         Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
 
       queue.complete()
+      queue.isCompleted shouldBe true
       queue.offer(1) should be(QueueOfferResult.QueueClosed)
       sub.expectSubscriptionAndComplete()
     }
@@ -108,6 +112,7 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
         Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
 
       queue.fail(ex)
+      queue.isCompleted shouldBe true
       queue.offer(1) should be(QueueOfferResult.Failure(ex))
       sub.request(1)
       sub.expectError(ex)
@@ -180,6 +185,7 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
       // where enqueueing an element concurrently with Done reaching the stage 
can lead to Enqueued being returned
       // but the element dropped (no guarantee of entering stream as 
documented in BoundedSourceQueue.offer
       queue.complete()
+      queue.isCompleted shouldBe true
 
       result.futureValue should be(counter.get())
     }
diff --git 
a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes
 
b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes
new file mode 100644
index 0000000000..b765986028
--- /dev/null
+++ 
b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1374-boundedsourcequeue-iscompleted-classes.backwards.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Add BoundedSourceQueue.isCompleted method
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.BoundedSourceQueue.isCompleted")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala 
b/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala
index d5b2377e6d..cf55dee581 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/BoundedSourceQueue.scala
@@ -38,6 +38,13 @@ trait BoundedSourceQueue[T] {
    */
   def complete(): Unit
 
+  /**
+   * Returns true if the stream has been completed, either normally or with 
failure.
+   *
+   * @since 1.1.0
+   */
+  def isCompleted: Boolean
+
   /**
    * Completes the stream with a failure.
    */
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala
index a904145e8a..5fd37481ea 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/BoundedSourceQueue.scala
@@ -114,7 +114,7 @@ import pekko.stream.stage.{ GraphStageLogic, 
GraphStageWithMaterializedValue, Ou
     }
 
     object Mat extends BoundedSourceQueue[T] {
-      override def offer(elem: T): QueueOfferResult = state.get() match {
+      final override def offer(elem: T): QueueOfferResult = state.get() match {
         case Running | NeedsActivation =>
           if (queue.add(elem)) {
             // need to query state again because stage might have switched 
from Running -> NeedsActivation only after
@@ -130,21 +130,23 @@ import pekko.stream.stage.{ GraphStageLogic, 
GraphStageWithMaterializedValue, Ou
         case Done(result) => result
       }
 
-      override def complete(): Unit = {
+      final override def complete(): Unit = {
         if (state.get().isInstanceOf[Done])
           throw new IllegalStateException("The queue has already been 
completed.")
         if (setDone(Done(QueueOfferResult.QueueClosed)))
           Logic.callback.invoke(()) // if this thread won the completion race 
also schedule an async callback
       }
 
-      override def fail(ex: Throwable): Unit = {
+      final override def isCompleted: Boolean = state.get().isInstanceOf[Done]
+
+      final override def fail(ex: Throwable): Unit = {
         if (state.get().isInstanceOf[Done])
           throw new IllegalStateException("The queue has already been 
completed.")
         if (setDone(Done(QueueOfferResult.Failure(ex))))
           Logic.callback.invoke(()) // if this thread won the completion race 
also schedule an async callback
       }
 
-      override def size(): Int = queue.size()
+      final override def size(): Int = queue.size()
     }
 
     // some state transition helpers


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to