This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 545d71d05b feat: Add Flow#onErrorContinue operator. (#2322) (#2332)
545d71d05b is described below
commit 545d71d05b102c71b90951bfff1ef8dfbfaae0a3
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Oct 19 23:17:33 2025 +0800
feat: Add Flow#onErrorContinue operator. (#2322) (#2332)
* feat: Add Flow#onErrorContinue operator.
(cherry picked from commit 87b7ae810d8caf55ff7082385f8fa54b90c62611)
---
.../operators/Source-or-Flow/onErrorContinue.md | 27 +++++++
docs/src/main/paradox/stream/operators/index.md | 2 +
.../java/org/apache/pekko/stream/StreamTest.java | 5 ++
.../org/apache/pekko/stream/javadsl/FlowTest.java | 86 ++++++++++++++++++++
.../apache/pekko/stream/javadsl/SourceTest.java | 75 +++++++++++++++++
.../stream/scaladsl/FlowOnErrorContinueSpec.scala | 93 ++++++++++++++++++++++
.../org/apache/pekko/stream/javadsl/Flow.scala | 76 ++++++++++++++++++
.../org/apache/pekko/stream/javadsl/Source.scala | 76 ++++++++++++++++++
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 76 ++++++++++++++++++
.../apache/pekko/stream/javadsl/SubSource.scala | 76 ++++++++++++++++++
.../org/apache/pekko/stream/scaladsl/Flow.scala | 62 +++++++++++++++
11 files changed, 654 insertions(+)
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md
new file mode 100644
index 0000000000..66b7e3cfd9
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md
@@ -0,0 +1,27 @@
+# onErrorContinue
+
+Continues the stream when an upstream error occurs.
+
+@ref[Error handling](../index.md#error-handling)
+
+## Signature
+
+@apidoc[Source.onErrorContinue](Source) {
scala="#onErrorContinue(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)%3AFlowOps.this.Repr%5BT%5D"
java="#onErrorContinue(org.apache.pekko.japi.function.Procedure)" }
+@apidoc[Flow.onErrorContinue](Flow) {
scala="#onErrorContinue%5BT%20%3C%3A%20Throwable%5D(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
java="#onErrorContinue(java.lang.Class,org.apache.pekko.japi.function.Procedure)"
}
+
+## Description
+
+Continues the stream when an upstream error occurs.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** element is available from the upstream
+
+**backpressures** downstream backpressures
+
+**completes** upstream completes or upstream failed with exception this
operator can't handle
+
+**Cancels when** downstream cancels
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index 662cde6f56..dc6feffe26 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -383,6 +383,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
|--|--|--|
|Source/Flow|<a
name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to
`recover` this operators can be used to transform an error signal to a
different one *without* logging it as an error in the process.|
|Source/Flow|<a
name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows
completing the stream when an upstream error occurs.|
+|Source/Flow|<a
name="onerrorcontinue"></a>@ref[onErrorContinue](Source-or-Flow/onErrorContinue.md)|Continues
the stream when an upstream error occurs.|
|Source/Flow|<a
name="onerrorresume"></a>@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows
transforming a failure signal into a stream of elements provided by a factory
function.|
|RestartSource|<a
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap
the given @apidoc[Source] with a @apidoc[Source] that will restart it when it
fails using an exponential backoff. Notice that this @apidoc[Source] will not
restart on completion of the wrapped flow.|
|RestartFlow|<a
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap
the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it
fails using an exponential backoff. Notice that this @apidoc[Flow] will not
restart on completion of the wrapped flow.|
@@ -570,6 +571,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [none](Sink/none.md)
* [onComplete](Sink/onComplete.md)
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
+* [onErrorContinue](Source-or-Flow/onErrorContinue.md)
* [onErrorResume](Source-or-Flow/onErrorResume.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
index 6692db44e3..63dc25aab3 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
@@ -14,6 +14,7 @@
package org.apache.pekko.stream;
import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.scalatestplus.junit.JUnitSuite;
@@ -23,4 +24,8 @@ public abstract class StreamTest extends JUnitSuite {
protected StreamTest(PekkoJUnitActorSystemResource actorSystemResource) {
system = actorSystemResource.getSystem();
}
+
+ protected LoggingAdapter logger() {
+ return system.log();
+ }
}
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index 868334d4f8..c2603a8c4d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -1344,6 +1344,26 @@ public class FlowTest extends StreamTest {
.expectComplete();
}
+ @Test
+ public void mustBeAbleToOnErrorContinue() {
+ Source.from(Arrays.asList(1, 2))
+ .via(
+ Flow.of(Integer.class)
+ .map(
+ elem -> {
+ if (elem == 2) {
+ throw new RuntimeException("ex");
+ } else {
+ return elem;
+ }
+ })
+ .onErrorContinue(error -> logger().error(error, "Error
occurred")))
+ .runWith(TestSink.probe(system), system)
+ .request(2)
+ .expectNext(1)
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToOnErrorResume() {
Source.from(Arrays.asList(1, 2))
@@ -1381,6 +1401,28 @@ public class FlowTest extends StreamTest {
.expectComplete();
}
+ @Test
+ public void mustBeAbleToOnErrorContinueWithDedicatedException() {
+ Source.from(Arrays.asList(1, 2))
+ .via(
+ Flow.of(Integer.class)
+ .map(
+ elem -> {
+ if (elem == 2) {
+ throw new IllegalArgumentException("ex");
+ } else {
+ return elem;
+ }
+ })
+ .onErrorContinue(
+ IllegalArgumentException.class,
+ error -> logger().error(error, "Error occurred")))
+ .runWith(TestSink.probe(system), system)
+ .request(2)
+ .expectNext(1)
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToOnErrorResumeWithDedicatedException() {
Source.from(Arrays.asList(1, 2))
@@ -1421,6 +1463,28 @@ public class FlowTest extends StreamTest {
.expectError(ex);
}
+ @Test
+ public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() {
+ final IllegalArgumentException ex = new IllegalArgumentException("ex");
+ Source.from(Arrays.asList(1, 2))
+ .map(
+ elem -> {
+ if (elem == 2) {
+ throw ex;
+ } else {
+ return elem;
+ }
+ })
+ .via(
+ Flow.of(Integer.class)
+ .onErrorContinue(
+ TimeoutException.class, error -> logger().error(error,
"Error occurred")))
+ .runWith(TestSink.probe(system), system)
+ .request(2)
+ .expectNext(1)
+ .expectError(ex);
+ }
+
@Test
public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
final IllegalArgumentException ex = new IllegalArgumentException("ex");
@@ -1458,6 +1522,28 @@ public class FlowTest extends StreamTest {
.expectComplete();
}
+ @Test
+ public void mustBeAbleToOnErrorContinueWithPredicate() {
+ Source.from(Arrays.asList(1, 2))
+ .via(
+ Flow.of(Integer.class)
+ .map(
+ elem -> {
+ if (elem == 2) {
+ throw new IllegalArgumentException("Boom");
+ } else {
+ return elem;
+ }
+ })
+ .onErrorContinue(
+ ex -> ex.getMessage().contains("Boom"),
+ error -> logger().error(error, "Error occurred")))
+ .runWith(TestSink.probe(system), system)
+ .request(2)
+ .expectNext(1)
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToOnErrorResumeWithPredicate() {
Source.from(Arrays.asList(1, 2))
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 9e63236fbe..081f657820 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1665,6 +1665,24 @@ public class SourceTest extends StreamTest {
.expectComplete();
}
+ @Test
+ public void mustBeAbleToOnErrorContinue() {
+ Source.from(Arrays.asList(1, 2))
+ .map(
+ elem -> {
+ if (elem == 2) {
+ throw new RuntimeException("ex");
+ } else {
+ return elem;
+ }
+ })
+ .onErrorContinue(e -> logger().error(e, "Error encountered"))
+ .runWith(TestSink.probe(system), system)
+ .request(2)
+ .expectNext(1)
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToOnErrorResume() {
Source.from(Arrays.asList(1, 2))
@@ -1702,6 +1720,25 @@ public class SourceTest extends StreamTest {
.expectComplete();
}
+ @Test
+ public void mustBeAbleToOnErrorContinueWithDedicatedException() {
+ Source.from(Arrays.asList(1, 2))
+ .map(
+ elem -> {
+ if (elem == 2) {
+ throw new IllegalArgumentException("ex");
+ } else {
+ return elem;
+ }
+ })
+ .onErrorContinue(
+ IllegalArgumentException.class, e -> logger().error(e, "Error
encountered"))
+ .runWith(TestSink.probe(system), system)
+ .request(2)
+ .expectNext(1)
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToOnErrorResumeWithDedicatedException() {
Source.from(Arrays.asList(1, 2))
@@ -1740,6 +1777,25 @@ public class SourceTest extends StreamTest {
.expectError(ex);
}
+ @Test
+ public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() {
+ final IllegalArgumentException ex = new IllegalArgumentException("ex");
+ Source.from(Arrays.asList(1, 2))
+ .map(
+ elem -> {
+ if (elem == 2) {
+ throw ex;
+ } else {
+ return elem;
+ }
+ })
+ .onErrorContinue(TimeoutException.class, e -> logger().error(e, "Error
encountered"))
+ .runWith(TestSink.probe(system), system)
+ .request(2)
+ .expectNext(1)
+ .expectError(ex);
+ }
+
@Test
public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
final IllegalArgumentException ex = new IllegalArgumentException("ex");
@@ -1777,6 +1833,25 @@ public class SourceTest extends StreamTest {
.expectComplete();
}
+ @Test
+ public void mustBeAbleToOnErrorContinueWithPredicate() {
+ Source.from(Arrays.asList(1, 2))
+ .map(
+ elem -> {
+ if (elem == 2) {
+ throw new IllegalArgumentException("Boom");
+ } else {
+ return elem;
+ }
+ })
+ .onErrorContinue(
+ ex -> ex.getMessage().contains("Boom"), e -> logger().error(e,
"Error encountered"))
+ .runWith(TestSink.probe(system), system)
+ .request(2)
+ .expectNext(1)
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToOnErrorResumeWithPredicate() {
Source.from(Arrays.asList(1, 2))
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
new file mode 100644
index 0000000000..85a503e2c3
--- /dev/null
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import scala.concurrent.TimeoutException
+import scala.util.control.NoStackTrace
+
+import org.apache.pekko
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.TestSink
+
+class FlowOnErrorContinueSpec extends StreamSpec {
+ val ex = new RuntimeException("ex") with NoStackTrace
+
+ "A onErrorContinue" must {
+ "can complete with all exceptions" in {
+ Source(List(1, 2))
+ .map { a =>
+ if (a == 1) throw ex else a
+ }
+ .onErrorContinue[Throwable](log.error(_, "Error occurred"))
+ .runWith(TestSink[Int]())
+ .request(2)
+ .expectNext(2)
+ .expectComplete()
+ }
+
+ "can complete with dedicated exception type" in {
+ Source(List(1, 2))
+ .map { a =>
+ if (a == 2) throw new IllegalArgumentException() else a
+ }
+ .onErrorContinue[IllegalArgumentException](log.error(_, "Error
occurred"))
+ .runWith(TestSink[Int]())
+ .request(2)
+ .expectNext(1)
+ .expectComplete()
+ }
+
+ "can fail if an unexpected exception occur" in {
+ Source(List(1, 2))
+ .map { a =>
+ if (a == 2) throw new IllegalArgumentException() else a
+ }
+ .onErrorContinue[TimeoutException](log.error(_, "Error occurred"))
+ .runWith(TestSink[Int]())
+ .request(1)
+ .expectNext(1)
+ .request(1)
+ .expectError()
+ }
+
+ "can complete if the pf is applied" in {
+ Source(List(1, 2))
+ .map { a =>
+ if (a == 2) throw new TimeoutException() else a
+ }
+ .onErrorContinue(classOf[TimeoutException].isInstance(_))(log.error(_,
"Error occurred"))
+ .runWith(TestSink[Int]())
+ .request(2)
+ .expectNext(1)
+ .expectComplete()
+ }
+
+ "can fail if the pf is not applied" in {
+ Source(List(1, 2))
+ .map { a =>
+ if (a == 2) throw ex else a
+ }
+ .onErrorContinue(classOf[TimeoutException].isInstance(_))(log.error(_,
"Error occurred"))
+ .runWith(TestSink[Int]())
+ .request(2)
+ .expectNext(1)
+ .expectError()
+ }
+
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index c816535a94..6054ebc09a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -2270,6 +2270,82 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
case ex: Throwable if predicate.test(ex) => true
})
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]):
javadsl.Flow[In, Out, Mat] =
+ new Flow(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param clazz the class of the failure cause
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](clazz: Class[T],
+ errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In,
Out, Mat] =
+ new
Flow(delegate.onErrorContinue(clazz.isInstance(_))(errorConsumer.apply(_)))
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param p predicate which determines if the exception should be handled
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](predicate: function.Predicate[_ >:
Throwable],
+ errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In,
Out, Mat] =
+ new
Flow(delegate.onErrorContinue(predicate.test(_))(errorConsumer.apply(_)))
+
/**
* Transform a failure signal into a stream of elements provided by a
factory function.
* This allows to continue processing with another stream when a failure
occurs.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 00e2376d36..70eefdf3d0 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2540,6 +2540,82 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
case ex: Throwable if predicate.test(ex) => true
})
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]):
javadsl.Source[Out, Mat] =
+ new Source(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param clazz the class of the failure cause
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](clazz: Class[T], errorConsumer:
function.Procedure[_ >: Throwable])
+ : javadsl.Source[Out, Mat] =
+ new
Source(delegate.onErrorContinue(clazz.isInstance(_))(errorConsumer.apply(_)))
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param p predicate which determines if the exception should be handled
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+ errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Source[Out,
Mat] =
+ new Source(delegate.onErrorContinue(p.test(_))(errorConsumer.apply(_)))
+
/**
* Transform a failure signal into a Source of elements provided by a
factory function.
* This allows to continue processing with another stream when a failure
occurs.
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index edb3b907b8..72e1f77691 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1470,6 +1470,82 @@ class SubFlow[In, Out, Mat](
case ex: Throwable if predicate.test(ex) => true
})
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]):
SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param clazz the class of the failure cause
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](clazz: Class[T],
+ errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out,
Mat] =
+ new
SubFlow(delegate.onErrorContinue(clazz.isInstance(_))(errorConsumer.apply(_)))
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param p predicate which determines if the exception should be handled
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+ errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out,
Mat] =
+ new SubFlow(delegate.onErrorContinue(p.test(_))(errorConsumer.apply(_)))
+
/**
* While similar to [[recover]] this operator can be used to transform an
error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly
equivalent to `recover(t => throw t2)` since recover
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index bc77cb5bee..ca02f9c9cf 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1445,6 +1445,82 @@ class SubSource[Out, Mat](
case ex: Throwable if predicate.test(ex) => true
})
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]):
SubSource[Out, Mat] =
+ new SubSource(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param clazz the class of the failure cause
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](clazz: Class[T],
+ errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] =
+ new
SubSource(delegate.onErrorContinue(clazz.isInstance(_))(errorConsumer.apply(_)))
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param p predicate which determines if the exception should be handled
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+ errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] =
+ new SubSource(delegate.onErrorContinue(p.test(_))(errorConsumer.apply(_)))
+
/**
* While similar to [[recover]] this operator can be used to transform an
error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly
equivalent to `recover(t => throw t2)` since recover
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index cf4d921df9..d3d161b7ba 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -20,6 +20,7 @@ import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
import org.apache.pekko
import pekko.Done
@@ -1058,6 +1059,67 @@ trait FlowOps[+Out, +Mat] {
}: PartialFunction[Boolean, Graph[SourceShape[Out], NotUsed]]))
.withAttributes(DefaultAttributes.onErrorComplete and
SourceLocation.forLambda(pf)))
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue[T <: Throwable](errorConsumer: Throwable =>
Unit)(implicit tag: ClassTag[T]): Repr[Out] = {
+ this.withAttributes(ActorAttributes.supervisionStrategy {
+ case NonFatal(e) if tag.runtimeClass.isInstance(e) =>
+ errorConsumer(e)
+ Supervision.Resume
+ case _ => Supervision.Stop
+ })
+ }
+
+ /**
+ * Continues the stream when an upstream error occurs.
+ *
+ * When an error is signaled from upstream, the `errorConsumer` function is
invoked with the
+ * `Throwable`, and the stream resumes processing subsequent elements. The
element that caused
+ * the error is dropped.
+ *
+ * '''Note:''' This operator requires stream operators to support
supervision. If supervision
+ * is not supported, this operator will have no effect.
+ *
+ * '''Emits when''' an element is available from upstream
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param p predicate to determine which errors to handle
+ * @param errorConsumer function invoked when an error occurs
+ * @since 1.3.0
+ */
+ def onErrorContinue(p: Throwable => Boolean)(errorConsumer: Throwable =>
Unit): Repr[Out] = {
+ this.withAttributes(ActorAttributes.supervisionStrategy {
+ case NonFatal(e) if p(e) =>
+ errorConsumer(e)
+ Supervision.Resume
+ case _ => Supervision.Stop
+ })
+ }
+
/**
* While similar to [[recover]] this operator can be used to transform an
error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly
equivalent to `recover(t => throw t2)` since recover
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]