This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 569b1263f7 feat: Add more recover operators for java dsl. (#2336)
569b1263f7 is described below
commit 569b1263f7012ebcc1c87f226b62cc7e56307abb
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Oct 20 01:10:56 2025 +0800
feat: Add more recover operators for java dsl. (#2336)
---
.../org/apache/pekko/stream/javadsl/Flow.scala | 73 ++++++++++++++++-
.../org/apache/pekko/stream/javadsl/Source.scala | 75 ++++++++++++++++-
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 95 +++++++++++++++++++++-
.../apache/pekko/stream/javadsl/SubSource.scala | 87 +++++++++++++++++++-
4 files changed, 325 insertions(+), 5 deletions(-)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index b922fad77e..f90bf25690 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1859,6 +1859,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* Recover allows to send last element on failure and gracefully complete
the stream
+ *
* Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
* This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
*
@@ -1876,7 +1877,8 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
new Flow(delegate.recover(pf))
/**
- * Recover allows to send last element on failure and gracefully complete
the stream
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
* Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
* This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
*
@@ -1895,6 +1897,75 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
case elem if clazz.isInstance(elem) => creator.create()
}
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(clazz: Class[_ <: Throwable], fallbackValue: Out):
javadsl.Flow[In, Out, Mat] =
+ recover {
+ case elem if clazz.isInstance(elem) => fallbackValue
+ }
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(p: function.Predicate[_ >: Throwable], creator:
function.Creator[Out]): javadsl.Flow[In, Out, Mat] =
+ recover {
+ case elem if p.test(elem) => creator.create()
+ }
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out):
javadsl.Flow[In, Out, Mat] =
+ recover {
+ case elem if p.test(elem) => fallbackValue
+ }
+
/**
* While similar to [[recover]] this operator can be used to transform an
error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly
equivalent to `recover(t => throw t2)` since recover
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index e5844cd00a..1324dd4cb1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2095,7 +2095,8 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
new Source(delegate.wireTap(f(_)))
/**
- * Recover allows to send last element on failure and gracefully complete
the stream
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
* Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
* This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
*
@@ -2113,7 +2114,8 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
new Source(delegate.recover(pf))
/**
- * Recover allows to send last element on failure and gracefully complete
the stream
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
* Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
* This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
*
@@ -2132,6 +2134,75 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
case elem if clazz.isInstance(elem) => creator.create()
}
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(clazz: Class[_ <: Throwable], fallbackValue: Out):
javadsl.Source[Out, Mat] =
+ recover {
+ case elem if clazz.isInstance(elem) => fallbackValue
+ }
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(p: function.Predicate[_ >: Throwable], creator:
function.Creator[Out]): javadsl.Source[Out, Mat] =
+ recover {
+ case elem if p.test(elem) => creator.create()
+ }
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out):
javadsl.Source[Out, Mat] =
+ recover {
+ case elem if p.test(elem) => fallbackValue
+ }
+
/**
* While similar to [[recover]] this operator can be used to transform an
error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly
equivalent to `recover(t => throw t2)` since recover
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index fe6015ae61..45cb38c527 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1249,7 +1249,8 @@ final class SubFlow[In, Out, Mat](
new SubFlow(delegate.dropWhile(p.test))
/**
- * Recover allows to send last element on failure and gracefully complete
the stream
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
* Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
* This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
*
@@ -1266,6 +1267,98 @@ final class SubFlow[In, Out, Mat](
def recover(pf: PartialFunction[Throwable, Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.recover(pf))
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]):
SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.recover {
+ case elem if clazz.isInstance(elem) => creator.create()
+ })
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): SubFlow[In,
Out, Mat] =
+ new SubFlow(delegate.recover {
+ case elem if clazz.isInstance(elem) => fallbackValue
+ })
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(p: function.Predicate[_ >: Throwable], creator:
function.Creator[Out]): SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.recover {
+ case elem if p.test(elem) => creator.create()
+ })
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * Throwing an exception inside `recover` _will_ be logged on ERROR level
automatically.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out):
SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.recover {
+ case elem if p.test(elem) => fallbackValue
+ })
+
/**
* RecoverWith allows to switch to alternative Source on flow failure. It
will stay in effect after
* a failure has been recovered so that each time there is a failure it is
fed into the `pf` and a new
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 1cfc839a4b..f8d45ce854 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1233,7 +1233,8 @@ final class SubSource[Out, Mat](
new SubSource(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
- * Recover allows to send last element on failure and gracefully complete
the stream
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
* Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
* This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
*
@@ -1248,6 +1249,90 @@ final class SubSource[Out, Mat](
def recover(pf: PartialFunction[Throwable, Out]): SubSource[Out, Mat] =
new SubSource(delegate.recover(pf))
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]):
SubSource[Out, Mat] =
+ new SubSource(delegate.recover {
+ case elem if clazz.isInstance(elem) => creator.create()
+ })
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(clazz: Class[_ <: Throwable], fallbackValue: Out):
SubSource[Out, Mat] =
+ new SubSource(delegate.recover {
+ case elem if clazz.isInstance(elem) => fallbackValue
+ })
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(p: function.Predicate[_ >: Throwable], creator:
function.Creator[Out]): SubSource[Out, Mat] =
+ new SubSource(delegate.recover {
+ case elem if p.test(elem) => creator.create()
+ })
+
+ /**
+ * Recover allows to send last element on failure and gracefully complete
the stream.
+ *
+ * Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
+ * This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
+ *
+ * '''Emits when''' element is available from the upstream or upstream is
failed and pf returns an element
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or upstream failed with exception
pf can handle
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out):
SubSource[Out, Mat] =
+ new SubSource(delegate.recover {
+ case elem if p.test(elem) => fallbackValue
+ })
+
/**
* RecoverWith allows to switch to alternative Source on flow failure. It
will stay in effect after
* a failure has been recovered so that each time there is a failure it is
fed into the `pf` and a new
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]