This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 569b1263f7 feat: Add more recover operators  for java dsl. (#2336)
569b1263f7 is described below

commit 569b1263f7012ebcc1c87f226b62cc7e56307abb
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Oct 20 01:10:56 2025 +0800

    feat: Add more recover operators  for java dsl. (#2336)
---
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 73 ++++++++++++++++-
 .../org/apache/pekko/stream/javadsl/Source.scala   | 75 ++++++++++++++++-
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 95 +++++++++++++++++++++-
 .../apache/pekko/stream/javadsl/SubSource.scala    | 87 +++++++++++++++++++-
 4 files changed, 325 insertions(+), 5 deletions(-)

diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index b922fad77e..f90bf25690 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1859,6 +1859,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * Recover allows to send last element on failure and gracefully complete 
the stream
+   *
    * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
    * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
    *
@@ -1876,7 +1877,8 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
     new Flow(delegate.recover(pf))
 
   /**
-   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
    * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
    * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
    *
@@ -1895,6 +1897,75 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
       case elem if clazz.isInstance(elem) => creator.create()
     }
 
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): 
javadsl.Flow[In, Out, Mat] =
+    recover {
+      case elem if clazz.isInstance(elem) => fallbackValue
+    }
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], creator: 
function.Creator[Out]): javadsl.Flow[In, Out, Mat] =
+    recover {
+      case elem if p.test(elem) => creator.create()
+    }
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): 
javadsl.Flow[In, Out, Mat] =
+    recover {
+      case elem if p.test(elem) => fallbackValue
+    }
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index e5844cd00a..1324dd4cb1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2095,7 +2095,8 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
     new Source(delegate.wireTap(f(_)))
 
   /**
-   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
    * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
    * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
    *
@@ -2113,7 +2114,8 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
     new Source(delegate.recover(pf))
 
   /**
-   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
    * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
    * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
    *
@@ -2132,6 +2134,75 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
       case elem if clazz.isInstance(elem) => creator.create()
     }
 
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): 
javadsl.Source[Out, Mat] =
+    recover {
+      case elem if clazz.isInstance(elem) => fallbackValue
+    }
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], creator: 
function.Creator[Out]): javadsl.Source[Out, Mat] =
+    recover {
+      case elem if p.test(elem) => creator.create()
+    }
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): 
javadsl.Source[Out, Mat] =
+    recover {
+      case elem if p.test(elem) => fallbackValue
+    }
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index fe6015ae61..45cb38c527 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1249,7 +1249,8 @@ final class SubFlow[In, Out, Mat](
     new SubFlow(delegate.dropWhile(p.test))
 
   /**
-   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
    * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
    * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
    *
@@ -1266,6 +1267,98 @@ final class SubFlow[In, Out, Mat](
   def recover(pf: PartialFunction[Throwable, Out]): SubFlow[In, Out, Mat] =
     new SubFlow(delegate.recover(pf))
 
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): 
SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.recover {
+      case elem if clazz.isInstance(elem) => creator.create()
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): SubFlow[In, 
Out, Mat] =
+    new SubFlow(delegate.recover {
+      case elem if clazz.isInstance(elem) => fallbackValue
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], creator: 
function.Creator[Out]): SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.recover {
+      case elem if p.test(elem) => creator.create()
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): 
SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.recover {
+      case elem if p.test(elem) => fallbackValue
+    })
+
   /**
    * RecoverWith allows to switch to alternative Source on flow failure. It 
will stay in effect after
    * a failure has been recovered so that each time there is a failure it is 
fed into the `pf` and a new
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 1cfc839a4b..f8d45ce854 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1233,7 +1233,8 @@ final class SubSource[Out, Mat](
     new SubSource(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
 
   /**
-   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
    * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
    * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
    *
@@ -1248,6 +1249,90 @@ final class SubSource[Out, Mat](
   def recover(pf: PartialFunction[Throwable, Out]): SubSource[Out, Mat] =
     new SubSource(delegate.recover(pf))
 
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): 
SubSource[Out, Mat] =
+    new SubSource(delegate.recover {
+      case elem if clazz.isInstance(elem) => creator.create()
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): 
SubSource[Out, Mat] =
+    new SubSource(delegate.recover {
+      case elem if clazz.isInstance(elem) => fallbackValue
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], creator: 
function.Creator[Out]): SubSource[Out, Mat] =
+    new SubSource(delegate.recover {
+      case elem if p.test(elem) => creator.create()
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): 
SubSource[Out, Mat] =
+    new SubSource(delegate.recover {
+      case elem if p.test(elem) => fallbackValue
+    })
+
   /**
    * RecoverWith allows to switch to alternative Source on flow failure. It 
will stay in effect after
    * a failure has been recovered so that each time there is a failure it is 
fed into the `pf` and a new


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to