This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b3818b55a feat:Add onErrorComplete stream operator. (#913)
2b3818b55a is described below

commit 2b3818b55a371e5a38f0f1338a27a23372e51f74
Author: kerr <[email protected]>
AuthorDate: Mon Jan 8 02:30:52 2024 +0800

    feat:Add onErrorComplete stream operator. (#913)
---
 .../operators/Source-or-Flow/onErrorComplete.md    | 29 +++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 +
 .../org/apache/pekko/stream/javadsl/FlowTest.java  | 73 ++++++++++++++++
 .../stream/scaladsl/FlowOnErrorCompleteSpec.scala  | 98 ++++++++++++++++++++++
 .../org/apache/pekko/stream/impl/Stages.scala      |  1 +
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 55 ++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 55 ++++++++++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 54 ++++++++++++
 .../apache/pekko/stream/javadsl/SubSource.scala    | 54 ++++++++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    | 42 ++++++++++
 10 files changed, 463 insertions(+)

diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
new file mode 100644
index 0000000000..3b9d47d7f6
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
@@ -0,0 +1,29 @@
+# onErrorComplete
+
+Allows completing the stream when an upstream error occur.
+
+@ref[Error handling](../index.md#error-handling)
+
+## Signature
+
+@apidoc[Source.onErrorComplete](Source) { 
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(java.util.function.Predicate)" }
+@apidoc[Source.onErrorComplete](Source) { 
scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(java.lang.Class)" }
+@apidoc[Flow.onErrorComplete](Flow) { 
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(java.util.function.Predicate)" }
+@apidoc[Flow.onErrorComplete](Flow) { 
scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(java.lang.Class)" }
+
+## Description
+
+Allows to complete the stream when an upstream error occur.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** element is available from the upstream
+
+**backpressures** downstream backpressures
+
+**completes** upstream completes or upstream failed with exception this 
operator can handle
+
+**Cancels when** downstream cancels
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 28be441d18..2cc47ea67f 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -363,6 +363,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 | |Operator|Description|
 |--|--|--|
 |Source/Flow|<a 
name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to 
`recover` this operators can be used to transform an error signal to a 
different one *without* logging it as an error in the process.|
+|Source/Flow|<a 
name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows
 completing the stream when an upstream error occur.|
 |RestartSource|<a 
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap
 the given @apidoc[Source] with a @apidoc[Source] that will restart it when it 
fails using an exponential backoff. Notice that this @apidoc[Source] will not 
restart on completion of the wrapped flow.|
 |RestartFlow|<a 
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap
 the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it 
fails using an exponential backoff. Notice that this @apidoc[Flow] will not 
restart on completion of the wrapped flow.|
 |Source/Flow|<a 
name="recover"></a>@ref[recover](Source-or-Flow/recover.md)|Allow sending of 
one last element downstream when a failure has happened upstream.|
@@ -532,6 +533,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [never](Source/never.md)
 * [never](Sink/never.md)
 * [onComplete](Sink/onComplete.md)
+* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
 * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
 * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
 * [orElse](Source-or-Flow/orElse.md)
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index f9a8e2306e..f886cfbd0d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -1165,6 +1165,79 @@ public class FlowTest extends StreamTest {
     future.toCompletableFuture().get(3, TimeUnit.SECONDS);
   }
 
+  @Test
+  public void mustBeAbleToOnErrorComplete() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new RuntimeException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorComplete()
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorCompleteWithDedicatedException() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorComplete(IllegalArgumentException.class)
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToFailWhenExceptionTypeNotMatch() {
+    final IllegalArgumentException ex = new IllegalArgumentException("ex");
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw ex;
+              } else {
+                return elem;
+              }
+            })
+        .onErrorComplete(TimeoutException.class)
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectError(ex);
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorCompleteWithPredicate() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("Boom");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorComplete(ex -> ex.getMessage().contains("Boom"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToMapErrorClass() {
     final String head = "foo";
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorCompleteSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorCompleteSpec.scala
new file mode 100644
index 0000000000..338669f270
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorCompleteSpec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko.stream.testkit.StreamSpec
+import org.apache.pekko.stream.testkit.scaladsl.TestSink
+
+import scala.concurrent.TimeoutException
+import scala.util.control.NoStackTrace
+
+class FlowOnErrorCompleteSpec extends StreamSpec {
+  val ex = new RuntimeException("ex") with NoStackTrace
+
+  "A CompleteOn" must {
+    "can complete with all exceptions" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw ex else a
+        }
+        .onErrorComplete[Throwable]()
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectComplete()
+    }
+
+    "can complete with dedicated exception type" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new IllegalArgumentException() else a
+        }
+        .onErrorComplete[IllegalArgumentException]()
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectComplete()
+    }
+
+    "can fail if an unexpected exception occur" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new IllegalArgumentException() else a
+        }
+        .onErrorComplete[TimeoutException]()
+        .runWith(TestSink[Int]())
+        .request(1)
+        .expectNext(1)
+        .request(1)
+        .expectError()
+    }
+
+    "can complete if the pf is applied" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new TimeoutException() else a
+        }
+        .onErrorComplete {
+          case _: IllegalArgumentException => false
+          case _: TimeoutException         => true
+        }
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectComplete()
+    }
+
+    "can fail if the pf is not applied" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw ex else a
+        }
+        .onErrorComplete {
+          case _: IllegalArgumentException => false
+          case _: TimeoutException         => true
+        }
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectError()
+    }
+
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index c367fdba69..8ec7409dc3 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -92,6 +92,7 @@ import pekko.stream.Attributes._
     val mergePrioritized = name("mergePrioritized")
     val flattenMerge = name("flattenMerge")
     val recoverWith = name("recoverWith")
+    val onErrorComplete = name("onErrorComplete")
     val broadcast = name("broadcast")
     val wireTap = name("wireTap")
     val balance = name("balance")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 8aaf3a45ee..030f3ebfa4 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1907,6 +1907,61 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
         case elem if clazz.isInstance(elem) => supplier.get()
       })
 
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(): javadsl.Flow[In, Out, Mat] = 
onErrorComplete(classOf[Throwable])
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(clazz: Class[_ <: Throwable]): javadsl.Flow[In, Out, 
Mat] =
+    onErrorComplete(ex => clazz.isInstance(ex))
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with predicate return 
ture
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(predicate: java.util.function.Predicate[_ >: 
Throwable]): javadsl.Flow[In, Out, Mat] =
+    new Flow(delegate.onErrorComplete {
+      case ex: Throwable if predicate.test(ex) => true
+    })
+
   /**
    * Terminate processing (and cancel the upstream publisher) after the given
    * number of elements. Due to input buffering some elements may have been
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 2e6db54589..36cb1823f9 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2382,6 +2382,61 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
         case elem if clazz.isInstance(elem) => supplier.get()
       }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
 
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(): javadsl.Source[Out, Mat] = 
onErrorComplete(classOf[Throwable])
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(clazz: Class[_ <: Throwable]): javadsl.Source[Out, Mat] =
+    onErrorComplete(ex => clazz.isInstance(ex))
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with predicate return 
ture
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(predicate: java.util.function.Predicate[_ >: 
Throwable]): javadsl.Source[Out, Mat] =
+    new Source(delegate.onErrorComplete {
+      case ex: Throwable if predicate.test(ex) => true
+    })
+
   /**
    * Transform each input element into an `Iterable` of output elements that is
    * then flattened into the output stream.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 93942830b9..eee009f824 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1166,6 +1166,60 @@ class SubFlow[In, Out, Mat](
       pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): 
SubFlow[In, Out, Mat] =
     new SubFlow(delegate.recoverWithRetries(attempts, pf))
 
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(): SubFlow[In, Out, Mat] = 
onErrorComplete(classOf[Throwable])
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(clazz: Class[_ <: Throwable]): SubFlow[In, Out, Mat] = 
onErrorComplete(ex => clazz.isInstance(ex))
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with predicate return 
ture
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(predicate: java.util.function.Predicate[_ >: 
Throwable]): SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.onErrorComplete {
+      case ex: Throwable if predicate.test(ex) => true
+    })
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 14c441f3e4..eb7e4d38a6 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1146,6 +1146,60 @@ class SubSource[Out, Mat](
       pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): 
SubSource[Out, Mat] =
     new SubSource(delegate.recoverWithRetries(attempts, pf))
 
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(): SubSource[Out, Mat] = 
onErrorComplete(classOf[Throwable])
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(clazz: Class[_ <: Throwable]): SubSource[Out, Mat] = 
onErrorComplete(ex => clazz.isInstance(ex))
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with predicate return 
ture
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(predicate: java.util.function.Predicate[_ >: 
Throwable]): SubSource[Out, Mat] =
+    new SubSource(delegate.onErrorComplete {
+      case ex: Throwable if predicate.test(ex) => true
+    })
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index e26f48837e..9452ffc972 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -929,6 +929,48 @@ trait FlowOps[+Out, +Mat] {
       pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] 
=
     via(new RecoverWith(attempts, pf))
 
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception is an 
instance of the provided type
+   *
+   * '''Cancels when''' downstream cancels
+   * @since 1.1.0
+   */
+  def onErrorComplete[T <: Throwable]()(implicit tag: ClassTag[T]): Repr[Out] 
= onErrorComplete {
+    case ex if tag.runtimeClass.isInstance(ex) => true
+  }
+
+  /**
+   * onErrorComplete allows to complete the stream when an upstream error 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or failed with exception pf can 
handle
+   *
+   * '''Cancels when''' downstream cancels
+   *  @since 1.1.0
+   */
+  def onErrorComplete(pf: PartialFunction[Throwable, Boolean]): Repr[Out] =
+    via(
+      Flow[Out]
+        .recoverWith(pf.andThen({
+          case true => Source.empty[Out]
+        }: PartialFunction[Boolean, Graph[SourceShape[Out], NotUsed]]))
+        .withAttributes(DefaultAttributes.onErrorComplete and 
SourceLocation.forLambda(pf)))
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to