This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch onErrorContinue in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 49bed69ce70ff53ac4d42dce1f3925bda3e3d36c Author: He-Pin <[email protected]> AuthorDate: Sat Oct 18 20:31:28 2025 +0800 feat: Add Flow#onErrorContinue operator. --- .../operators/Source-or-Flow/onErrorContinue.md | 27 ++++++ docs/src/main/paradox/stream/operators/index.md | 2 + .../java/org/apache/pekko/stream/StreamTest.java | 5 ++ .../org/apache/pekko/stream/javadsl/FlowTest.java | 86 +++++++++++++++++++ .../apache/pekko/stream/javadsl/SourceTest.java | 75 ++++++++++++++++ .../stream/scaladsl/FlowOnErrorContinueSpec.scala | 99 ++++++++++++++++++++++ .../org/apache/pekko/stream/javadsl/Flow.scala | 76 +++++++++++++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 76 +++++++++++++++++ .../org/apache/pekko/stream/javadsl/SubFlow.scala | 76 +++++++++++++++++ .../apache/pekko/stream/javadsl/SubSource.scala | 76 +++++++++++++++++ .../org/apache/pekko/stream/scaladsl/Flow.scala | 65 +++++++++++++- 11 files changed, 661 insertions(+), 2 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md new file mode 100644 index 0000000000..66b7e3cfd9 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md @@ -0,0 +1,27 @@ +# onErrorContinue + +Continues the stream when an upstream error occurs. + +@ref[Error handling](../index.md#error-handling) + +## Signature + +@apidoc[Source.onErrorContinue](Source) { scala="#onErrorContinue(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorContinue(org.apache.pekko.japi.function.Procedure)" } +@apidoc[Flow.onErrorContinue](Flow) { scala="#onErrorContinue%5BT%20%3C%3A%20Throwable%5D(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorContinue(java.lang.Class,org.apache.pekko.japi.function.Procedure)" } + +## Description + +Continues the stream when an upstream error occurs. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** element is available from the upstream + +**backpressures** downstream backpressures + +**completes** upstream completes or upstream failed with exception this operator can't handle + +**Cancels when** downstream cancels +@@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index da367ebad1..4de4334ba9 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -372,6 +372,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) |--|--|--| |Source/Flow|<a name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.| |Source/Flow|<a name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows completing the stream when an upstream error occurs.| +|Source/Flow|<a name="onerrorcontinue"></a>@ref[onErrorContinue](Source-or-Flow/onErrorContinue.md)|Continues the stream when an upstream error occurs.| |Source/Flow|<a name="onerrorresume"></a>@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows transforming a failure signal into a stream of elements provided by a factory function.| |RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.| |RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.| @@ -550,6 +551,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [none](Sink/none.md) * [onComplete](Sink/onComplete.md) * [onErrorComplete](Source-or-Flow/onErrorComplete.md) +* [onErrorContinue](Source-or-Flow/onErrorContinue.md) * [onErrorResume](Source-or-Flow/onErrorResume.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java index 6692db44e3..63dc25aab3 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java @@ -14,6 +14,7 @@ package org.apache.pekko.stream; import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.event.LoggingAdapter; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.scalatestplus.junit.JUnitSuite; @@ -23,4 +24,8 @@ public abstract class StreamTest extends JUnitSuite { protected StreamTest(PekkoJUnitActorSystemResource actorSystemResource) { system = actorSystemResource.getSystem(); } + + protected LoggingAdapter logger() { + return system.log(); + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 2137589d6d..19ab1d51f6 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1345,6 +1345,26 @@ public class FlowTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinue() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorContinue(error -> logger().error(error, "Error occurred"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResume() { Source.from(Arrays.asList(1, 2)) @@ -1382,6 +1402,28 @@ public class FlowTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinueWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorContinue( + IllegalArgumentException.class, + error -> logger().error(error, "Error occurred"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResumeWithDedicatedException() { Source.from(Arrays.asList(1, 2)) @@ -1422,6 +1464,28 @@ public class FlowTest extends StreamTest { .expectError(ex); } + @Test + public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorContinue( + TimeoutException.class, error -> logger().error(error, "Error occurred"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + @Test public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() { final IllegalArgumentException ex = new IllegalArgumentException("ex"); @@ -1459,6 +1523,28 @@ public class FlowTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinueWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorContinue( + ex -> ex.getMessage().contains("Boom"), + error -> logger().error(error, "Error occurred"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResumeWithPredicate() { Source.from(Arrays.asList(1, 2)) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 3e47d66c3a..f8c95ae51f 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1666,6 +1666,24 @@ public class SourceTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinue() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .onErrorContinue(e -> logger().error(e, "Error encountered")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResume() { Source.from(Arrays.asList(1, 2)) @@ -1703,6 +1721,25 @@ public class SourceTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinueWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .onErrorContinue( + IllegalArgumentException.class, e -> logger().error(e, "Error encountered")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResumeWithDedicatedException() { Source.from(Arrays.asList(1, 2)) @@ -1741,6 +1778,25 @@ public class SourceTest extends StreamTest { .expectError(ex); } + @Test + public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .onErrorContinue(TimeoutException.class, e -> logger().error(e, "Error encountered")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + @Test public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() { final IllegalArgumentException ex = new IllegalArgumentException("ex"); @@ -1778,6 +1834,25 @@ public class SourceTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinueWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .onErrorContinue( + ex -> ex.getMessage().contains("Boom"), e -> logger().error(e, "Error encountered")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResumeWithPredicate() { Source.from(Arrays.asList(1, 2)) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala new file mode 100644 index 0000000000..4a35b6db3c --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.testkit.StreamSpec +import pekko.stream.testkit.scaladsl.TestSink + +import scala.concurrent.TimeoutException +import scala.util.control.NoStackTrace + +class FlowOnErrorContinueSpec extends StreamSpec { + val ex = new RuntimeException("ex") with NoStackTrace + + "A onErrorContinue" must { + "can complete with all exceptions" in { + Source(List(1, 2)) + .map { a => + if (a == 1) throw ex else a + } + .onErrorContinue[Throwable](log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(2) + .expectComplete() + } + + "can complete with dedicated exception type" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new IllegalArgumentException() else a + } + .onErrorContinue[IllegalArgumentException](log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectComplete() + } + + "can fail if an unexpected exception occur" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new IllegalArgumentException() else a + } + .onErrorContinue[TimeoutException](log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(1) + .request(1) + .expectError() + } + + "can complete if the pf is applied" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new TimeoutException() else a + } + .onErrorContinue { + case _: TimeoutException => true + case _ => false + }(log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectComplete() + } + + "can fail if the pf is not applied" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw ex else a + } + .onErrorContinue { + case _: TimeoutException => true + case _ => false + }(log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectError() + } + + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 7dba8b7e06..b8be99a2b9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -2106,6 +2106,82 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr case ex: Throwable if predicate.test(ex) => true }) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.onErrorContinue[Throwable](errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class of the failure cause + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](clazz: Class[T], + errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate which determines if the exception should be handled + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](predicate: function.Predicate[_ >: Throwable], + errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.onErrorContinue(predicate.test)(errorConsumer.apply)) + /** * Transform a failure signal into a stream of elements provided by a factory function. * This allows to continue processing with another stream when a failure occurs. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 93ed130117..127146a1a9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2340,6 +2340,82 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ case ex: Throwable if predicate.test(ex) => true }) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Source[Out, Mat] = + new Source(delegate.onErrorContinue[Throwable](errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class of the failure cause + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](clazz: Class[T], errorConsumer: function.Procedure[_ >: Throwable]) + : javadsl.Source[Out, Mat] = + new Source(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate which determines if the exception should be handled + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable], + errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Source[Out, Mat] = + new Source(delegate.onErrorContinue(p.test)(errorConsumer.apply)) + /** * Transform a failure signal into a Source of elements provided by a factory function. * This allows to continue processing with another stream when a failure occurs. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 1a03e0921d..8da6eb21c0 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1369,6 +1369,82 @@ final class SubFlow[In, Out, Mat]( case ex: Throwable if predicate.test(ex) => true }) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.onErrorContinue[Throwable](errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class of the failure cause + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](clazz: Class[T], + errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate which determines if the exception should be handled + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable], + errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.onErrorContinue(p.test)(errorConsumer.apply)) + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 5a2abfec29..5521a2f157 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1346,6 +1346,82 @@ final class SubSource[Out, Mat]( case ex: Throwable if predicate.test(ex) => true }) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] = + new SubSource(delegate.onErrorContinue[Throwable](errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class of the failure cause + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](clazz: Class[T], + errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] = + new SubSource(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate which determines if the exception should be handled + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable], + errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] = + new SubSource(delegate.onErrorContinue(p.test)(errorConsumer.apply)) + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 6499b39abe..05102dbfa9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -19,7 +19,6 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag - import org.apache.pekko import pekko.Done import pekko.NotUsed @@ -47,12 +46,13 @@ import pekko.stream.stage._ import pekko.util.ConstantFun import pekko.util.OptionVal import pekko.util.Timeout - import org.reactivestreams.Processor import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription +import scala.util.control.NonFatal + /** * A `Flow` is a set of stream processing steps that has one open input and one open output. */ @@ -989,6 +989,67 @@ trait FlowOps[+Out, +Mat] { }: PartialFunction[Boolean, Graph[SourceShape[Out], NotUsed]])) .withAttributes(DefaultAttributes.onErrorComplete and SourceLocation.forLambda(pf))) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](errorConsumer: Throwable => Unit)(implicit tag: ClassTag[T]): Repr[Out] = { + this.withAttributes(ActorAttributes.supervisionStrategy { + case NonFatal(e) if tag.runtimeClass.isInstance(e) => + errorConsumer(e) + Supervision.Resume + case _ => Supervision.Stop + }) + } + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate to determine which errors to handle + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(p: Throwable => Boolean)(errorConsumer: Throwable => Unit): Repr[Out] = { + this.withAttributes(ActorAttributes.supervisionStrategy { + case NonFatal(e) if p(e) => + errorConsumer(e) + Supervision.Resume + case _ => Supervision.Stop + }) + } + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
