This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new fcdd35a897 feat: Add doOnCancel operator. (#2375)
fcdd35a897 is described below
commit fcdd35a897d201cfec7215baa7ff381b37541589
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Oct 26 02:27:12 2025 +0800
feat: Add doOnCancel operator. (#2375)
---
.../stream/operators/Source-or-Flow/doOnCancel.md | 28 +++++
docs/src/main/paradox/stream/operators/index.md | 2 +
.../org/apache/pekko/stream/javadsl/FlowTest.java | 22 ++++
.../pekko/stream/scaladsl/FlowDoOnCancelSpec.scala | 118 +++++++++++++++++++++
.../org/apache/pekko/stream/impl/Stages.scala | 1 +
.../pekko/stream/impl/fusing/DoOnCancel.scala | 59 +++++++++++
.../org/apache/pekko/stream/javadsl/Flow.scala | 21 ++++
.../org/apache/pekko/stream/javadsl/Source.scala | 21 ++++
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 21 ++++
.../apache/pekko/stream/javadsl/SubSource.scala | 21 ++++
.../org/apache/pekko/stream/scaladsl/Flow.scala | 20 ++++
11 files changed, 334 insertions(+)
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnCancel.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnCancel.md
new file mode 100644
index 0000000000..87fa44e406
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnCancel.md
@@ -0,0 +1,28 @@
+# doOnCancel
+
+Run the given function when the downstream cancels.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.doOnCancel](Source) {
scala="#doOnCancel(f:(Throwable,Boolean)=>Unit):FlowOps.this.Repr[Out]"
java="#doOnCancel(org.apache.pekko.japi.function.Procedure2)" }
+@apidoc[Flow.doOnCancel](Flow) {
scala="#doOnCancel(f:(Throwable,Boolean)=>Unit):FlowOps.this.Repr[Out]"
java="#doOnCancel(org.apache.pekko.japi.function.Procedure2)" }
+
+## Description
+
+Run the given function when the downstream cancels.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when upstream emits an element
+
+**backpressures** when downstream backpressures
+
+**completes** when upstream completes
+
+**cancels** when downstream cancels
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index 2b14389ab8..eb8517a819 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -148,6 +148,7 @@ depending on being backpressured by downstream or not.
|Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform
this Flow by applying a function to each *incoming* upstream element before it
is passed to the Flow.|
|Source/Flow|<a
name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand
from downstream demand without detaching the stream rates.|
|Flow|<a name="dimap"></a>@ref[dimap](Flow/dimap.md)|Transform this Flow by
applying a function `f` to each *incoming* upstream element before it is passed
to the Flow, and a function `g` to each *outgoing* downstream element.|
+|Source/Flow|<a
name="dooncancel"></a>@ref[doOnCancel](Source-or-Flow/doOnCancel.md)|Run the
given function when the downstream cancels.|
|Source/Flow|<a
name="doonfirst"></a>@ref[doOnFirst](Source-or-Flow/doOnFirst.md)|Run the given
function when the first element is received.|
|Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n`
elements and then pass any subsequent element downstream.|
|Source/Flow|<a
name="droprepeated"></a>@ref[dropRepeated](Source-or-Flow/dropRepeated.md)|Only
pass on those elements that are distinct from the previous element.|
@@ -445,6 +446,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [detach](Source-or-Flow/detach.md)
* [dimap](Flow/dimap.md)
* [divertTo](Source-or-Flow/divertTo.md)
+* [doOnCancel](Source-or-Flow/doOnCancel.md)
* [doOnFirst](Source-or-Flow/doOnFirst.md)
* [drop](Source-or-Flow/drop.md)
* [dropRepeated](Source-or-Flow/dropRepeated.md)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index f679ed3e88..ce7197b021 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -135,6 +135,28 @@ public class FlowTest extends StreamTest {
Assert.assertEquals(1, invoked.get());
}
+ @Test
+ public void mustBeAbleToUseDoOnCancel() throws Exception {
+ final var invoked = new AtomicInteger(0);
+ final var promise = new CompletableFuture<Done>();
+ final var future =
+ Source.range(1, 10)
+ .via(
+ Flow.of(Integer.class)
+ .doOnCancel(
+ (ex, wasCancelledNormally) -> {
+ if (wasCancelledNormally) {
+ invoked.incrementAndGet();
+ }
+ promise.complete(done());
+ }))
+ .take(5)
+ .runWith(Sink.ignore(), system);
+ promise.get(3, TimeUnit.SECONDS);
+ future.toCompletableFuture().get(3, TimeUnit.SECONDS);
+ Assert.assertEquals(1, invoked.get());
+ }
+
@Test
public void mustBeAbleToUseDoOnFirstOnEmptySource() throws Exception {
final var invoked = new AtomicInteger(0);
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnCancelSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnCancelSpec.scala
new file mode 100644
index 0000000000..5b0eb45ae2
--- /dev/null
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnCancelSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.stream.testkit._
+
+class FlowDoOnCancelSpec extends StreamSpec("""
+ pekko.stream.materializer.initial-input-buffer-size = 2
+ """) with ScriptedTest {
+
+ "A DoOnCancel" must {
+ "be invoked once on normal cancellation" in {
+ val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+ val promise = scala.concurrent.Promise[Done]()
+ val f: (Throwable, Boolean) => Unit = (_, wasCancelledNormally) => {
+ if (wasCancelledNormally) invoked.incrementAndGet()
+ promise.success(Done)
+ }
+ Source(1 to 10)
+ .watchTermination()(Keep.right)
+ .via(Flow[Int].doOnCancel(f))
+ .toMat(Sink.cancelled)(Keep.left)
+ .run()
+ .futureValue
+ .shouldBe(Done)
+ promise.future.futureValue.shouldBe(Done)
+ invoked.get() shouldBe 1
+ }
+
+ "be invoked once on stream completed" in {
+ val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+ val promise = scala.concurrent.Promise[Done]()
+ val f: (Throwable, Boolean) => Unit = (_, wasCancelledNormally) => {
+ if (wasCancelledNormally) invoked.incrementAndGet()
+ promise.success(Done)
+ }
+ Source(1 to 10)
+ .via(Flow[Int].doOnCancel(f))
+ .take(1)
+ .runWith(Sink.ignore)
+ .futureValue
+ .shouldBe(Done)
+ promise.future.futureValue.shouldBe(Done)
+ invoked.get() shouldBe 1
+ }
+
+ "Not be invoked on empty source" in {
+ val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+ val f: (Throwable, Boolean) => Unit = (_, wasCancelledNormally) => {
+ if (wasCancelledNormally) invoked.incrementAndGet()
+ }
+ Source.empty
+ .via(Flow[Int].doOnCancel(f))
+ .runWith(Sink.ignore)
+ .futureValue
+ .shouldBe(Done)
+ invoked.get() shouldBe 0
+ }
+
+ "be invoked once on downstream cancellation with no more elements needed"
in {
+ import org.apache.pekko.stream.SubscriptionWithCancelException._
+ val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+ val promise = scala.concurrent.Promise[Done]()
+ val f: (Throwable, Boolean) => Unit = (cause, wasCancelledNormally) => {
+ if (wasCancelledNormally && (cause eq NoMoreElementsNeeded))
invoked.incrementAndGet()
+ promise.success(Done)
+ }
+ Source(1 to 10)
+ .orElse(Source.never[Int].via(Flow[Int].doOnCancel(f)))
+ .take(5)
+ .runWith(Sink.ignore)
+ .futureValue
+ .shouldBe(Done)
+ promise.future.futureValue.shouldBe(Done)
+ invoked.get() shouldBe 1
+ }
+
+ "be invoked once on downstream cancellation on failure" in {
+ val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+ val promise = scala.concurrent.Promise[Done]()
+ val f: (Throwable, Boolean) => Unit = (_, wasCancelledNormally) => {
+ if (!wasCancelledNormally) invoked.incrementAndGet()
+ promise.success(Done)
+ }
+ Source(1 to 10)
+ .via(Flow[Int].doOnCancel(f))
+ .map {
+ case 5 => throw new RuntimeException("Boom!")
+ case x => x
+ }
+ .runWith(Sink.ignore)
+ .failed
+ .futureValue
+ .shouldBe(an[RuntimeException])
+ promise.future.futureValue.shouldBe(Done)
+ invoked.get() shouldBe 1
+ }
+
+ }
+
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 6977e71ec5..5e0a97b1f7 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -36,6 +36,7 @@ import pekko.stream.Attributes._
val filter = name("filter")
val filterNot = name("filterNot")
val doOnFirst = name("doOnFirst")
+ val doOnCancel = name("DoOnCancel")
val collect = name("collect")
val collectFirst = name("collectFirst")
val collectWhile = name("collectWhile")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnCancel.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnCancel.scala
new file mode 100644
index 0000000000..e7419ca558
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnCancel.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.Attributes.SourceLocation
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler
}
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet,
SubscriptionWithCancelException }
+
+/**
+ * INTERNAL API
+ *
+ * @param f The function to be invoked when downstream cancels. The boolean
parameter indicates whether the
+ * cancellation was normal (true) or due to failure (false).
+ */
+@InternalApi private[pekko] final class DoOnCancel[In](
+ f: (Throwable, Boolean) => Unit) extends GraphStage[FlowShape[In, In]] {
+ require(f != null, "DoOnCancel function must not be null")
+ private val in = Inlet[In]("DoOnCancel.in")
+ private val out = Outlet[In]("DoOnCancel.out")
+ override val shape: FlowShape[In, In] = FlowShape(in, out)
+
+ override def initialAttributes: Attributes = DefaultAttributes.doOnCancel
and SourceLocation.forLambda(f)
+
+ override def createLogic(inheritedAttributes:
org.apache.pekko.stream.Attributes) =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ final override def onPush(): Unit = push(out, grab(in))
+ final override def onPull(): Unit = pull(in)
+ final override def onDownstreamFinish(cause: Throwable): Unit = {
+ import SubscriptionWithCancelException._
+ val wasCancelledNormally = (cause eq NoMoreElementsNeeded) || (cause
eq StageWasCompleted)
+ try {
+ f(cause, wasCancelledNormally)
+ } finally {
+ // Ensure the stage is properly finished even if f throws
+ super.onDownstreamFinish(cause)
+ }
+ }
+
+ setHandlers(in, out, this)
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 117df4e416..e48a130965 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1138,6 +1138,27 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def doOnFirst(f: function.Procedure[Out]): javadsl.Flow[In, Out, Mat] = new
Flow(delegate.doOnFirst(f(_)))
+ /**
+ * Run the given function when the downstream cancels.
+ *
+ * The first parameter is the cause of the cancellation, and the second
parameter indicates whether
+ * the downstream was cancelled normally.
+ *
+ * '''Emits when''' upstream emits an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param f function to be run on cancellation, the first parameter is the
cause of the cancellation,
+ * and the second parameter indicates whether the downstream was
cancelled normally.
+ * @since 1.3.0
+ */
+ def doOnCancel(f: function.Procedure2[_ >: Throwable, java.lang.Boolean]):
javadsl.Flow[In, Out, Mat] =
+ new Flow(delegate.doOnCancel((ex: Throwable, wasCancelledNormally:
Boolean) => f(ex, wasCancelledNormally)))
+
/**
* Only pass on those elements that are distinct from the previous element.
*
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index e7806e55d8..de1fcf9554 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3001,6 +3001,27 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*/
def doOnFirst(f: function.Procedure[Out]): javadsl.Source[Out, Mat] = new
Source(delegate.doOnFirst(f(_)))
+ /**
+ * Run the given function when the downstream cancels.
+ *
+ * The first parameter is the cause of the cancellation, and the second
parameter indicates whether
+ * the downstream was cancelled normally.
+ *
+ * '''Emits when''' upstream emits an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param f function to be run on cancellation, the first parameter is the
cause of the cancellation,
+ * and the second parameter indicates whether the downstream was
cancelled normally.
+ * @since 1.3.0
+ */
+ def doOnCancel(f: function.Procedure2[_ >: Throwable, java.lang.Boolean]):
javadsl.Source[Out, Mat] =
+ new Source(delegate.doOnCancel((ex: Throwable, wasCancelledNormally:
Boolean) => f(ex, wasCancelledNormally)))
+
/**
* Only pass on those elements that are distinct from the previous element.
*
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 4f6652310e..495879f13a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -540,6 +540,27 @@ final class SubFlow[In, Out, Mat](
*/
def doOnFirst(f: function.Procedure[Out]): SubFlow[In, Out, Mat] = new
javadsl.SubFlow(delegate.doOnFirst(f(_)))
+ /**
+ * Run the given function when the downstream cancels.
+ *
+ * The first parameter is the cause of the cancellation, and the second
parameter indicates whether
+ * the downstream was cancelled normally.
+ *
+ * '''Emits when''' upstream emits an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param f function to be run on cancellation, the first parameter is the
cause of the cancellation,
+ * and the second parameter indicates whether the downstream was
cancelled normally.
+ * @since 1.3.0
+ */
+ def doOnCancel(f: function.Procedure2[_ >: Throwable, java.lang.Boolean]):
SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.doOnCancel((ex: Throwable, wasCancelledNormally:
Boolean) => f(ex, wasCancelledNormally)))
+
/**
* Only pass on those elements that are distinct from the previous element.
*
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index d41f4c6465..a7d69a055b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -531,6 +531,27 @@ final class SubSource[Out, Mat](
*/
def doOnFirst(f: function.Procedure[Out]): SubSource[Out, Mat] = new
SubSource(delegate.doOnFirst(f(_)))
+ /**
+ * Run the given function when the downstream cancels.
+ *
+ * The first parameter is the cause of the cancellation, and the second
parameter indicates whether
+ * the downstream was cancelled normally.
+ *
+ * '''Emits when''' upstream emits an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param f function to be run on cancellation, the first parameter is the
cause of the cancellation,
+ * and the second parameter indicates whether the downstream was
cancelled normally.
+ * @since 1.3.0
+ */
+ def doOnCancel(f: function.Procedure2[_ >: Throwable, java.lang.Boolean]):
SubSource[Out, Mat] =
+ new SubSource(delegate.doOnCancel((cause, wasCancelledNormally) =>
f(cause, wasCancelledNormally)))
+
/**
* Only pass on those elements that are distinct from the previous element.
*
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 329c7d2db3..043bb208ea 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1582,6 +1582,26 @@ trait FlowOps[+Out, +Mat] {
*/
def doOnFirst(f: Out => Unit): Repr[Out] = via(new DoOnFirst[Out](f))
+ /**
+ * Run the given function when the downstream cancels.
+ *
+ * The first parameter is the cause of the cancellation, and the second
parameter indicates whether
+ * the downstream was cancelled normally.
+ *
+ * '''Emits when''' upstream emits an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param f function to be run on cancellation, the first parameter is the
cause of the cancellation,
+ * and the second parameter indicates whether the downstream was
cancelled normally.
+ * @since 1.3.0
+ */
+ def doOnCancel(f: (Throwable, Boolean) => Unit): Repr[Out] = via(new
DoOnCancel[Out](f))
+
/**
* Only pass on those elements that are distinct from the previous element.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]