This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch onErrorContinue
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 49bed69ce70ff53ac4d42dce1f3925bda3e3d36c
Author: He-Pin <[email protected]>
AuthorDate: Sat Oct 18 20:31:28 2025 +0800

    feat: Add Flow#onErrorContinue operator.
---
 .../operators/Source-or-Flow/onErrorContinue.md    | 27 ++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 +
 .../java/org/apache/pekko/stream/StreamTest.java   |  5 ++
 .../org/apache/pekko/stream/javadsl/FlowTest.java  | 86 +++++++++++++++++++
 .../apache/pekko/stream/javadsl/SourceTest.java    | 75 ++++++++++++++++
 .../stream/scaladsl/FlowOnErrorContinueSpec.scala  | 99 ++++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 76 +++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 76 +++++++++++++++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 76 +++++++++++++++++
 .../apache/pekko/stream/javadsl/SubSource.scala    | 76 +++++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    | 65 +++++++++++++-
 11 files changed, 661 insertions(+), 2 deletions(-)

diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md
new file mode 100644
index 0000000000..66b7e3cfd9
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md
@@ -0,0 +1,27 @@
+# onErrorContinue
+
+Continues the stream when an upstream error occurs.
+
+@ref[Error handling](../index.md#error-handling)
+
+## Signature
+
+@apidoc[Source.onErrorContinue](Source) { 
scala="#onErrorContinue(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorContinue(org.apache.pekko.japi.function.Procedure)" }
+@apidoc[Flow.onErrorContinue](Flow) { 
scala="#onErrorContinue%5BT%20%3C%3A%20Throwable%5D(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
 
java="#onErrorContinue(java.lang.Class,org.apache.pekko.japi.function.Procedure)"
 }
+
+## Description
+
+Continues the stream when an upstream error occurs.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** element is available from the upstream
+
+**backpressures** downstream backpressures
+
+**completes** upstream completes or upstream failed with exception this 
operator can't handle
+
+**Cancels when** downstream cancels
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index da367ebad1..4de4334ba9 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -372,6 +372,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 |--|--|--|
 |Source/Flow|<a 
name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to 
`recover` this operators can be used to transform an error signal to a 
different one *without* logging it as an error in the process.|
 |Source/Flow|<a 
name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows
 completing the stream when an upstream error occurs.|
+|Source/Flow|<a 
name="onerrorcontinue"></a>@ref[onErrorContinue](Source-or-Flow/onErrorContinue.md)|Continues
 the stream when an upstream error occurs.|
 |Source/Flow|<a 
name="onerrorresume"></a>@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows
 transforming a failure signal into a stream of elements provided by a factory 
function.|
 |RestartSource|<a 
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap
 the given @apidoc[Source] with a @apidoc[Source] that will restart it when it 
fails using an exponential backoff. Notice that this @apidoc[Source] will not 
restart on completion of the wrapped flow.|
 |RestartFlow|<a 
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap
 the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it 
fails using an exponential backoff. Notice that this @apidoc[Flow] will not 
restart on completion of the wrapped flow.|
@@ -550,6 +551,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [none](Sink/none.md)
 * [onComplete](Sink/onComplete.md)
 * [onErrorComplete](Source-or-Flow/onErrorComplete.md)
+* [onErrorContinue](Source-or-Flow/onErrorContinue.md)
 * [onErrorResume](Source-or-Flow/onErrorResume.md)
 * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
 * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
index 6692db44e3..63dc25aab3 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
@@ -14,6 +14,7 @@
 package org.apache.pekko.stream;
 
 import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.event.LoggingAdapter;
 import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
 import org.scalatestplus.junit.JUnitSuite;
 
@@ -23,4 +24,8 @@ public abstract class StreamTest extends JUnitSuite {
   protected StreamTest(PekkoJUnitActorSystemResource actorSystemResource) {
     system = actorSystemResource.getSystem();
   }
+
+  protected LoggingAdapter logger() {
+    return system.log();
+  }
 }
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index 2137589d6d..19ab1d51f6 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -1345,6 +1345,26 @@ public class FlowTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinue() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new RuntimeException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .via(
+            Flow.of(Integer.class)
+                .onErrorContinue(error -> logger().error(error, "Error 
occurred")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResume() {
     Source.from(Arrays.asList(1, 2))
@@ -1382,6 +1402,28 @@ public class FlowTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinueWithDedicatedException() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .via(
+            Flow.of(Integer.class)
+                .onErrorContinue(
+                    IllegalArgumentException.class,
+                    error -> logger().error(error, "Error occurred")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResumeWithDedicatedException() {
     Source.from(Arrays.asList(1, 2))
@@ -1422,6 +1464,28 @@ public class FlowTest extends StreamTest {
         .expectError(ex);
   }
 
+  @Test
+  public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() {
+    final IllegalArgumentException ex = new IllegalArgumentException("ex");
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw ex;
+              } else {
+                return elem;
+              }
+            })
+        .via(
+            Flow.of(Integer.class)
+                .onErrorContinue(
+                    TimeoutException.class, error -> logger().error(error, 
"Error occurred")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectError(ex);
+  }
+
   @Test
   public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
     final IllegalArgumentException ex = new IllegalArgumentException("ex");
@@ -1459,6 +1523,28 @@ public class FlowTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinueWithPredicate() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("Boom");
+              } else {
+                return elem;
+              }
+            })
+        .via(
+            Flow.of(Integer.class)
+                .onErrorContinue(
+                    ex -> ex.getMessage().contains("Boom"),
+                    error -> logger().error(error, "Error occurred")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResumeWithPredicate() {
     Source.from(Arrays.asList(1, 2))
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 3e47d66c3a..f8c95ae51f 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1666,6 +1666,24 @@ public class SourceTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinue() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new RuntimeException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorContinue(e -> logger().error(e, "Error encountered"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResume() {
     Source.from(Arrays.asList(1, 2))
@@ -1703,6 +1721,25 @@ public class SourceTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinueWithDedicatedException() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorContinue(
+            IllegalArgumentException.class, e -> logger().error(e, "Error 
encountered"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResumeWithDedicatedException() {
     Source.from(Arrays.asList(1, 2))
@@ -1741,6 +1778,25 @@ public class SourceTest extends StreamTest {
         .expectError(ex);
   }
 
+  @Test
+  public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() {
+    final IllegalArgumentException ex = new IllegalArgumentException("ex");
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw ex;
+              } else {
+                return elem;
+              }
+            })
+        .onErrorContinue(TimeoutException.class, e -> logger().error(e, "Error 
encountered"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectError(ex);
+  }
+
   @Test
   public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
     final IllegalArgumentException ex = new IllegalArgumentException("ex");
@@ -1778,6 +1834,25 @@ public class SourceTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinueWithPredicate() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("Boom");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorContinue(
+            ex -> ex.getMessage().contains("Boom"), e -> logger().error(e, 
"Error encountered"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResumeWithPredicate() {
     Source.from(Arrays.asList(1, 2))
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
new file mode 100644
index 0000000000..4a35b6db3c
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.TestSink
+
+import scala.concurrent.TimeoutException
+import scala.util.control.NoStackTrace
+
+class FlowOnErrorContinueSpec extends StreamSpec {
+  val ex = new RuntimeException("ex") with NoStackTrace
+
+  "A onErrorContinue" must {
+    "can complete with all exceptions" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 1) throw ex else a
+        }
+        .onErrorContinue[Throwable](log.error(_, "Error occurred"))
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(2)
+        .expectComplete()
+    }
+
+    "can complete with dedicated exception type" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new IllegalArgumentException() else a
+        }
+        .onErrorContinue[IllegalArgumentException](log.error(_, "Error 
occurred"))
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectComplete()
+    }
+
+    "can fail if an unexpected exception occur" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new IllegalArgumentException() else a
+        }
+        .onErrorContinue[TimeoutException](log.error(_, "Error occurred"))
+        .runWith(TestSink[Int]())
+        .request(1)
+        .expectNext(1)
+        .request(1)
+        .expectError()
+    }
+
+    "can complete if the pf is applied" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new TimeoutException() else a
+        }
+        .onErrorContinue {
+          case _: TimeoutException => true
+          case _                   => false
+        }(log.error(_, "Error occurred"))
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectComplete()
+    }
+
+    "can fail if the pf is not applied" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw ex else a
+        }
+        .onErrorContinue {
+          case _: TimeoutException => true
+          case _                   => false
+        }(log.error(_, "Error occurred"))
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectError()
+    }
+
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 7dba8b7e06..b8be99a2b9 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -2106,6 +2106,82 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): 
javadsl.Flow[In, Out, Mat] =
+    new Flow(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class of the failure cause
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](clazz: Class[T],
+      errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, 
Out, Mat] =
+    new Flow(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate which determines if the exception should be handled
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](predicate: function.Predicate[_ >: 
Throwable],
+      errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, 
Out, Mat] =
+    new Flow(delegate.onErrorContinue(predicate.test)(errorConsumer.apply))
+
   /**
    * Transform a failure signal into a stream of elements provided by a 
factory function.
    * This allows to continue processing with another stream when a failure 
occurs.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 93ed130117..127146a1a9 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2340,6 +2340,82 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): 
javadsl.Source[Out, Mat] =
+    new Source(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class of the failure cause
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](clazz: Class[T], errorConsumer: 
function.Procedure[_ >: Throwable])
+      : javadsl.Source[Out, Mat] =
+    new Source(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate which determines if the exception should be handled
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+      errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Source[Out, 
Mat] =
+    new Source(delegate.onErrorContinue(p.test)(errorConsumer.apply))
+
   /**
    * Transform a failure signal into a Source of elements provided by a 
factory function.
    * This allows to continue processing with another stream when a failure 
occurs.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 1a03e0921d..8da6eb21c0 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1369,6 +1369,82 @@ final class SubFlow[In, Out, Mat](
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): 
SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class of the failure cause
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](clazz: Class[T],
+      errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, 
Mat] =
+    new 
SubFlow(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate which determines if the exception should be handled
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+      errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, 
Mat] =
+    new SubFlow(delegate.onErrorContinue(p.test)(errorConsumer.apply))
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 5a2abfec29..5521a2f157 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1346,6 +1346,82 @@ final class SubSource[Out, Mat](
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): 
SubSource[Out, Mat] =
+    new SubSource(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class of the failure cause
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](clazz: Class[T],
+      errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] =
+    new 
SubSource(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate which determines if the exception should be handled
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+      errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] =
+    new SubSource(delegate.onErrorContinue(p.test)(errorConsumer.apply))
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 6499b39abe..05102dbfa9 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -19,7 +19,6 @@ import scala.collection.immutable
 import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
 import scala.reflect.ClassTag
-
 import org.apache.pekko
 import pekko.Done
 import pekko.NotUsed
@@ -47,12 +46,13 @@ import pekko.stream.stage._
 import pekko.util.ConstantFun
 import pekko.util.OptionVal
 import pekko.util.Timeout
-
 import org.reactivestreams.Processor
 import org.reactivestreams.Publisher
 import org.reactivestreams.Subscriber
 import org.reactivestreams.Subscription
 
+import scala.util.control.NonFatal
+
 /**
  * A `Flow` is a set of stream processing steps that has one open input and 
one open output.
  */
@@ -989,6 +989,67 @@ trait FlowOps[+Out, +Mat] {
         }: PartialFunction[Boolean, Graph[SourceShape[Out], NotUsed]]))
         .withAttributes(DefaultAttributes.onErrorComplete and 
SourceLocation.forLambda(pf)))
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](errorConsumer: Throwable => 
Unit)(implicit tag: ClassTag[T]): Repr[Out] = {
+    this.withAttributes(ActorAttributes.supervisionStrategy {
+      case NonFatal(e) if tag.runtimeClass.isInstance(e) =>
+        errorConsumer(e)
+        Supervision.Resume
+      case _ => Supervision.Stop
+    })
+  }
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate to determine which errors to handle
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(p: Throwable => Boolean)(errorConsumer: Throwable => 
Unit): Repr[Out] = {
+    this.withAttributes(ActorAttributes.supervisionStrategy {
+      case NonFatal(e) if p(e) =>
+        errorConsumer(e)
+        Supervision.Resume
+      case _ => Supervision.Stop
+    })
+  }
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to