This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new fcdd35a897 feat: Add doOnCancel operator. (#2375)
fcdd35a897 is described below

commit fcdd35a897d201cfec7215baa7ff381b37541589
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Oct 26 02:27:12 2025 +0800

    feat: Add doOnCancel operator. (#2375)
---
 .../stream/operators/Source-or-Flow/doOnCancel.md  |  28 +++++
 docs/src/main/paradox/stream/operators/index.md    |   2 +
 .../org/apache/pekko/stream/javadsl/FlowTest.java  |  22 ++++
 .../pekko/stream/scaladsl/FlowDoOnCancelSpec.scala | 118 +++++++++++++++++++++
 .../org/apache/pekko/stream/impl/Stages.scala      |   1 +
 .../pekko/stream/impl/fusing/DoOnCancel.scala      |  59 +++++++++++
 .../org/apache/pekko/stream/javadsl/Flow.scala     |  21 ++++
 .../org/apache/pekko/stream/javadsl/Source.scala   |  21 ++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  |  21 ++++
 .../apache/pekko/stream/javadsl/SubSource.scala    |  21 ++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |  20 ++++
 11 files changed, 334 insertions(+)

diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnCancel.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnCancel.md
new file mode 100644
index 0000000000..87fa44e406
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnCancel.md
@@ -0,0 +1,28 @@
+# doOnCancel
+
+Run the given function when the downstream cancels.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.doOnCancel](Source) { 
scala="#doOnCancel(f:(Throwable,Boolean)=&gt;Unit):FlowOps.this.Repr[Out]" 
java="#doOnCancel(org.apache.pekko.japi.function.Procedure2)" }
+@apidoc[Flow.doOnCancel](Flow) { 
scala="#doOnCancel(f:(Throwable,Boolean)=&gt;Unit):FlowOps.this.Repr[Out]" 
java="#doOnCancel(org.apache.pekko.japi.function.Procedure2)" }
+
+## Description
+
+Run the given function when the downstream cancels.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when upstream emits an element
+
+**backpressures** when downstream backpressures
+
+**completes** when upstream completes
+
+**cancels** when downstream cancels
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 2b14389ab8..eb8517a819 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -148,6 +148,7 @@ depending on being backpressured by downstream or not.
 |Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform 
this Flow by applying a function to each *incoming* upstream element before it 
is passed to the Flow.|
 |Source/Flow|<a 
name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand 
from downstream demand without detaching the stream rates.|
 |Flow|<a name="dimap"></a>@ref[dimap](Flow/dimap.md)|Transform this Flow by 
applying a function `f` to each *incoming* upstream element before it is passed 
to the Flow, and a function `g` to each *outgoing* downstream element.|
+|Source/Flow|<a 
name="dooncancel"></a>@ref[doOnCancel](Source-or-Flow/doOnCancel.md)|Run the 
given function when the downstream cancels.|
 |Source/Flow|<a 
name="doonfirst"></a>@ref[doOnFirst](Source-or-Flow/doOnFirst.md)|Run the given 
function when the first element is received.|
 |Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n` 
elements and then pass any subsequent element downstream.|
 |Source/Flow|<a 
name="droprepeated"></a>@ref[dropRepeated](Source-or-Flow/dropRepeated.md)|Only 
pass on those elements that are distinct from the previous element.|
@@ -445,6 +446,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [detach](Source-or-Flow/detach.md)
 * [dimap](Flow/dimap.md)
 * [divertTo](Source-or-Flow/divertTo.md)
+* [doOnCancel](Source-or-Flow/doOnCancel.md)
 * [doOnFirst](Source-or-Flow/doOnFirst.md)
 * [drop](Source-or-Flow/drop.md)
 * [dropRepeated](Source-or-Flow/dropRepeated.md)
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index f679ed3e88..ce7197b021 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -135,6 +135,28 @@ public class FlowTest extends StreamTest {
     Assert.assertEquals(1, invoked.get());
   }
 
+  @Test
+  public void mustBeAbleToUseDoOnCancel() throws Exception {
+    final var invoked = new AtomicInteger(0);
+    final var promise = new CompletableFuture<Done>();
+    final var future =
+        Source.range(1, 10)
+            .via(
+                Flow.of(Integer.class)
+                    .doOnCancel(
+                        (ex, wasCancelledNormally) -> {
+                          if (wasCancelledNormally) {
+                            invoked.incrementAndGet();
+                          }
+                          promise.complete(done());
+                        }))
+            .take(5)
+            .runWith(Sink.ignore(), system);
+    promise.get(3, TimeUnit.SECONDS);
+    future.toCompletableFuture().get(3, TimeUnit.SECONDS);
+    Assert.assertEquals(1, invoked.get());
+  }
+
   @Test
   public void mustBeAbleToUseDoOnFirstOnEmptySource() throws Exception {
     final var invoked = new AtomicInteger(0);
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnCancelSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnCancelSpec.scala
new file mode 100644
index 0000000000..5b0eb45ae2
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnCancelSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.stream.testkit._
+
+class FlowDoOnCancelSpec extends StreamSpec("""
+    pekko.stream.materializer.initial-input-buffer-size = 2
+  """) with ScriptedTest {
+
+  "A DoOnCancel" must {
+    "be invoked once on normal cancellation" in {
+      val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+      val promise = scala.concurrent.Promise[Done]()
+      val f: (Throwable, Boolean) => Unit = (_, wasCancelledNormally) => {
+        if (wasCancelledNormally) invoked.incrementAndGet()
+        promise.success(Done)
+      }
+      Source(1 to 10)
+        .watchTermination()(Keep.right)
+        .via(Flow[Int].doOnCancel(f))
+        .toMat(Sink.cancelled)(Keep.left)
+        .run()
+        .futureValue
+        .shouldBe(Done)
+      promise.future.futureValue.shouldBe(Done)
+      invoked.get() shouldBe 1
+    }
+
+    "be invoked once on stream completed" in {
+      val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+      val promise = scala.concurrent.Promise[Done]()
+      val f: (Throwable, Boolean) => Unit = (_, wasCancelledNormally) => {
+        if (wasCancelledNormally) invoked.incrementAndGet()
+        promise.success(Done)
+      }
+      Source(1 to 10)
+        .via(Flow[Int].doOnCancel(f))
+        .take(1)
+        .runWith(Sink.ignore)
+        .futureValue
+        .shouldBe(Done)
+      promise.future.futureValue.shouldBe(Done)
+      invoked.get() shouldBe 1
+    }
+
+    "Not be invoked on empty source" in {
+      val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+      val f: (Throwable, Boolean) => Unit = (_, wasCancelledNormally) => {
+        if (wasCancelledNormally) invoked.incrementAndGet()
+      }
+      Source.empty
+        .via(Flow[Int].doOnCancel(f))
+        .runWith(Sink.ignore)
+        .futureValue
+        .shouldBe(Done)
+      invoked.get() shouldBe 0
+    }
+
+    "be invoked once on downstream cancellation with no more elements needed" 
in {
+      import org.apache.pekko.stream.SubscriptionWithCancelException._
+      val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+      val promise = scala.concurrent.Promise[Done]()
+      val f: (Throwable, Boolean) => Unit = (cause, wasCancelledNormally) => {
+        if (wasCancelledNormally && (cause eq NoMoreElementsNeeded)) 
invoked.incrementAndGet()
+        promise.success(Done)
+      }
+      Source(1 to 10)
+        .orElse(Source.never[Int].via(Flow[Int].doOnCancel(f)))
+        .take(5)
+        .runWith(Sink.ignore)
+        .futureValue
+        .shouldBe(Done)
+      promise.future.futureValue.shouldBe(Done)
+      invoked.get() shouldBe 1
+    }
+
+    "be invoked once on downstream cancellation on failure" in {
+      val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+      val promise = scala.concurrent.Promise[Done]()
+      val f: (Throwable, Boolean) => Unit = (_, wasCancelledNormally) => {
+        if (!wasCancelledNormally) invoked.incrementAndGet()
+        promise.success(Done)
+      }
+      Source(1 to 10)
+        .via(Flow[Int].doOnCancel(f))
+        .map {
+          case 5 => throw new RuntimeException("Boom!")
+          case x => x
+        }
+        .runWith(Sink.ignore)
+        .failed
+        .futureValue
+        .shouldBe(an[RuntimeException])
+      promise.future.futureValue.shouldBe(Done)
+      invoked.get() shouldBe 1
+    }
+
+  }
+
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 6977e71ec5..5e0a97b1f7 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -36,6 +36,7 @@ import pekko.stream.Attributes._
     val filter = name("filter")
     val filterNot = name("filterNot")
     val doOnFirst = name("doOnFirst")
+    val doOnCancel = name("DoOnCancel")
     val collect = name("collect")
     val collectFirst = name("collectFirst")
     val collectWhile = name("collectWhile")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnCancel.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnCancel.scala
new file mode 100644
index 0000000000..e7419ca558
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnCancel.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.Attributes.SourceLocation
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet, 
SubscriptionWithCancelException }
+
+/**
+ * INTERNAL API
+ *
+ * @param f The function to be invoked when downstream cancels. The boolean 
parameter indicates whether the
+ *          cancellation was normal (true) or due to failure (false).
+ */
+@InternalApi private[pekko] final class DoOnCancel[In](
+    f: (Throwable, Boolean) => Unit) extends GraphStage[FlowShape[In, In]] {
+  require(f != null, "DoOnCancel function must not be null")
+  private val in = Inlet[In]("DoOnCancel.in")
+  private val out = Outlet[In]("DoOnCancel.out")
+  override val shape: FlowShape[In, In] = FlowShape(in, out)
+
+  override def initialAttributes: Attributes = DefaultAttributes.doOnCancel 
and SourceLocation.forLambda(f)
+
+  override def createLogic(inheritedAttributes: 
org.apache.pekko.stream.Attributes) =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      final override def onPush(): Unit = push(out, grab(in))
+      final override def onPull(): Unit = pull(in)
+      final override def onDownstreamFinish(cause: Throwable): Unit = {
+        import SubscriptionWithCancelException._
+        val wasCancelledNormally = (cause eq NoMoreElementsNeeded) || (cause 
eq StageWasCompleted)
+        try {
+          f(cause, wasCancelledNormally)
+        } finally {
+          // Ensure the stage is properly finished even if f throws
+          super.onDownstreamFinish(cause)
+        }
+      }
+
+      setHandlers(in, out, this)
+    }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 117df4e416..e48a130965 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1138,6 +1138,27 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    */
   def doOnFirst(f: function.Procedure[Out]): javadsl.Flow[In, Out, Mat] = new 
Flow(delegate.doOnFirst(f(_)))
 
+  /**
+   * Run the given function when the downstream cancels.
+   *
+   * The first parameter is the cause of the cancellation, and the second 
parameter indicates whether
+   * the downstream was cancelled normally.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param f function to be run on cancellation, the first parameter is the 
cause of the cancellation,
+   *          and the second parameter indicates whether the downstream was 
cancelled normally.
+   * @since 1.3.0
+   */
+  def doOnCancel(f: function.Procedure2[_ >: Throwable, java.lang.Boolean]): 
javadsl.Flow[In, Out, Mat] =
+    new Flow(delegate.doOnCancel((ex: Throwable, wasCancelledNormally: 
Boolean) => f(ex, wasCancelledNormally)))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index e7806e55d8..de1fcf9554 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3001,6 +3001,27 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    */
   def doOnFirst(f: function.Procedure[Out]): javadsl.Source[Out, Mat] = new 
Source(delegate.doOnFirst(f(_)))
 
+  /**
+   * Run the given function when the downstream cancels.
+   *
+   * The first parameter is the cause of the cancellation, and the second 
parameter indicates whether
+   * the downstream was cancelled normally.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param f function to be run on cancellation, the first parameter is the 
cause of the cancellation,
+   *          and the second parameter indicates whether the downstream was 
cancelled normally.
+   * @since 1.3.0
+   */
+  def doOnCancel(f: function.Procedure2[_ >: Throwable, java.lang.Boolean]): 
javadsl.Source[Out, Mat] =
+    new Source(delegate.doOnCancel((ex: Throwable, wasCancelledNormally: 
Boolean) => f(ex, wasCancelledNormally)))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 4f6652310e..495879f13a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -540,6 +540,27 @@ final class SubFlow[In, Out, Mat](
    */
   def doOnFirst(f: function.Procedure[Out]): SubFlow[In, Out, Mat] = new 
javadsl.SubFlow(delegate.doOnFirst(f(_)))
 
+  /**
+   * Run the given function when the downstream cancels.
+   *
+   * The first parameter is the cause of the cancellation, and the second 
parameter indicates whether
+   * the downstream was cancelled normally.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param f function to be run on cancellation, the first parameter is the 
cause of the cancellation,
+   *          and the second parameter indicates whether the downstream was 
cancelled normally.
+   * @since 1.3.0
+   */
+  def doOnCancel(f: function.Procedure2[_ >: Throwable, java.lang.Boolean]): 
SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.doOnCancel((ex: Throwable, wasCancelledNormally: 
Boolean) => f(ex, wasCancelledNormally)))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index d41f4c6465..a7d69a055b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -531,6 +531,27 @@ final class SubSource[Out, Mat](
    */
   def doOnFirst(f: function.Procedure[Out]): SubSource[Out, Mat] = new 
SubSource(delegate.doOnFirst(f(_)))
 
+  /**
+   * Run the given function when the downstream cancels.
+   *
+   * The first parameter is the cause of the cancellation, and the second 
parameter indicates whether
+   * the downstream was cancelled normally.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param f function to be run on cancellation, the first parameter is the 
cause of the cancellation,
+   *          and the second parameter indicates whether the downstream was 
cancelled normally.
+   * @since 1.3.0
+   */
+  def doOnCancel(f: function.Procedure2[_ >: Throwable, java.lang.Boolean]): 
SubSource[Out, Mat] =
+    new SubSource(delegate.doOnCancel((cause, wasCancelledNormally) => 
f(cause, wasCancelledNormally)))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 329c7d2db3..043bb208ea 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1582,6 +1582,26 @@ trait FlowOps[+Out, +Mat] {
    */
   def doOnFirst(f: Out => Unit): Repr[Out] = via(new DoOnFirst[Out](f))
 
+  /**
+   * Run the given function when the downstream cancels.
+   *
+   * The first parameter is the cause of the cancellation, and the second 
parameter indicates whether
+   * the downstream was cancelled normally.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param f function to be run on cancellation, the first parameter is the 
cause of the cancellation,
+   *          and the second parameter indicates whether the downstream was 
cancelled normally.
+   * @since 1.3.0
+   */
+  def doOnCancel(f: (Throwable, Boolean) => Unit): Repr[Out] = via(new 
DoOnCancel[Out](f))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to