This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch jf
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 1b5c1cced4ff377ca72d331bf6914ba9e142571f
Author: He-Pin <[email protected]>
AuthorDate: Fri Sep 5 02:20:01 2025 +0800

    chore: Make use of japi.function in stream api.
---
 .../org/apache/pekko/japi/function/Function.scala  |  2 +-
 .../main/paradox/stream/operators/ActorFlow/ask.md |  2 +-
 .../stream/operators/ActorFlow/askWithContext.md   |  2 +-
 .../stream/operators/ActorFlow/askWithStatus.md    |  4 +--
 .../operators/ActorFlow/askWithStatusAndContext.md |  2 +-
 .../stream/operators/ActorSource/actorRef.md       |  2 +-
 .../stream/operators/Sink/fromMaterializer.md      |  2 +-
 .../Source-or-Flow/aggregateWithBoundary.md        |  4 +--
 .../stream/operators/Source-or-Flow/delayWith.md   |  4 +--
 .../operators/Source-or-Flow/fromMaterializer.md   |  4 +--
 .../operators/Source-or-Flow/onErrorComplete.md    |  4 +--
 .../stream/operators/Source-or-Flow/recover.md     |  4 +--
 .../stream/operators/Source-or-Flow/recoverWith.md |  4 +--
 .../operators/Source-or-Flow/recoverWithRetries.md |  4 +--
 docs/src/main/paradox/stream/stream-dynamic.md     |  2 +-
 docs/src/main/paradox/stream/stream-error.md       |  8 ++---
 .../javaapi-functions.excludes                     | 28 +++++++++++++++
 .../pekko/stream/typed/javadsl/ActorFlow.scala     | 14 ++++----
 .../pekko/stream/typed/javadsl/ActorSource.scala   |  5 ++-
 .../javaapi-functions.excludes                     | 27 ++++++++++++++
 .../org/apache/pekko/stream/RestartSettings.scala  |  6 ++--
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 42 +++++++++++-----------
 .../org/apache/pekko/stream/javadsl/Hub.scala      | 13 +++----
 .../org/apache/pekko/stream/javadsl/Sink.scala     |  4 +--
 .../org/apache/pekko/stream/javadsl/Source.scala   | 41 +++++++++++----------
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 23 ++++++------
 .../apache/pekko/stream/javadsl/SubSource.scala    | 23 ++++++------
 .../org/apache/pekko/stream/javadsl/TLS.scala      | 12 +++----
 .../org/apache/pekko/stream/javadsl/Tcp.scala      | 23 ++++++------
 29 files changed, 182 insertions(+), 133 deletions(-)

diff --git a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala 
b/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala
index 109a4332d3..250c20dc20 100644
--- a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala
+++ b/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala
@@ -105,7 +105,7 @@ trait Predicate2[-T1, -T2] extends java.io.Serializable {
 
 /**
  * A constructor/factory, takes no parameters but creates a new value of type 
T every call.
- * Supports throwing `Exception` in the apply, which the 
`java.util.function.Supplier` counterpart does not.
+ * Supports throwing `Exception` in the create method, which the 
`java.util.function.Supplier` counterpart does not.
  */
 @nowarn("msg=@SerialVersionUID has no effect")
 @SerialVersionUID(1L)
diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/ask.md 
b/docs/src/main/paradox/stream/operators/ActorFlow/ask.md
index 7fe77db067..5c2c6e8113 100644
--- a/docs/src/main/paradox/stream/operators/ActorFlow/ask.md
+++ b/docs/src/main/paradox/stream/operators/ActorFlow/ask.md
@@ -19,7 +19,7 @@ This operator is included in:
 
 ## Signature
 
-@apidoc[ActorFlow.ask](ActorFlow$) { 
scala="#ask%5BI,Q,A](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5BI,A,org.apache.pekko.NotUsed]"
 
java="#ask(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)"
 }
+@apidoc[ActorFlow.ask](ActorFlow$) { 
scala="#ask%5BI,Q,A](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5BI,A,org.apache.pekko.NotUsed]"
 
java="#ask(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,org.apache.pekko.japi.function.Function2)"
 }
 
 ## Description
 
diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md 
b/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md
index bf420e0586..6359a4213f 100644
--- a/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md
+++ b/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md
@@ -19,7 +19,7 @@ This operator is included in:
 
 ## Signature
 
-@apidoc[ActorFlow.askWithContext](ActorFlow$) { 
scala="#askWithContext%5BI,Q,A,Ctx](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]"
 
java="#askWithContext(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)"
 }
+@apidoc[ActorFlow.askWithContext](ActorFlow$) { 
scala="#askWithContext%5BI,Q,A,Ctx](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]"
 
java="#askWithContext(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,org.apache.pekko.japi.function.Function2)"
 }
 
 ## Description
 
diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md 
b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md
index d23f3974b5..9721940cd0 100644
--- a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md
+++ b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md
@@ -18,8 +18,8 @@ This operator is included in:
 
 ## Signature
 
-@apidoc[ActorFlow.askWithStatus](ActorFlow$) { 
scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]"
 java 
="#askWithStatus[I,Q,A](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.u
 [...]
-@apidoc[ActorFlow.askWithStatus](ActorFlow$) { 
scala="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]"
 java 
="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,org.apa
 [...]
+@apidoc[ActorFlow.askWithStatus](ActorFlow$) { 
scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]"
 java 
="#askWithStatus[I,Q,A](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.ap
 [...]
+@apidoc[ActorFlow.askWithStatus](ActorFlow$) { 
scala="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]"
 java 
="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.apache.pekko.japi.function.Function
 [...]
 
 ## Description
 
diff --git 
a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md 
b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md
index de2e6d0312..4cb4ead595 100644
--- 
a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md
+++ 
b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md
@@ -19,7 +19,7 @@ This operator is included in:
 
 ## Signature
 
-@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) { 
scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]"
 java 
="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRe
 [...]
+@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) { 
scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]"
 java 
="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRe
 [...]
 
 ## Description
 
diff --git a/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md 
b/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md
index 35851bfea8..71cc41b153 100644
--- a/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md
+++ b/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md
@@ -19,7 +19,7 @@ This operator is included in:
 
 ## Signature
 
-@apidoc[ActorSource.actorRef](ActorSource$) { 
scala="#actorRef[T](completionMatcher:PartialFunction[T,Unit],failureMatcher:PartialFunction[T,Throwable],bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.actor.typed.ActorRef[T]]"
 
java="#actorRef(java.util.function.Predicate,org.apache.pekko.japi.function.Function,int,org.apache.pekko.stream.OverflowStrategy)"
 }
+@apidoc[ActorSource.actorRef](ActorSource$) { 
scala="#actorRef[T](completionMatcher:PartialFunction[T,Unit],failureMatcher:PartialFunction[T,Throwable],bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.actor.typed.ActorRef[T]]"
 
java="#actorRef(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function,int,org.apache.pekko.stream.OverflowStrategy)"
 }
 
 ## Description
 
diff --git a/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md 
b/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md
index 91cdef4746..c10b18b2a9 100644
--- a/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md
+++ b/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md
@@ -6,7 +6,7 @@ Defer the creation of a `Sink` until materialization and access 
`Materializer` a
 
 ## Signature
 
-@apidoc[Sink.fromMaterializer](Sink$) { 
scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=&gt;org.apache.pekko.stream.scaladsl.Sink[T,M]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]"
 java="#fromMaterializer(java.util.function.BiFunction)" }
+@apidoc[Sink.fromMaterializer](Sink$) { 
scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=&gt;org.apache.pekko.stream.scaladsl.Sink[T,M]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]"
 java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
 
 ## Description
 
diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
index d6e8959bdc..4c053db52a 100644
--- 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
+++ 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
@@ -8,8 +8,8 @@ Aggregate and emit until custom boundary condition met.
 
 ## Signature
 
-@apidoc[Source.aggregateWithBoundary](Source) { 
scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]"
 
java="#aggregateWithBoundary(java.util.function.Supplier,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"}
-@apidoc[Flow.aggregateWithBoundary](Flow) { 
scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]"
 
java="#aggregateWithBoundary(java.util.function.Supplier,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"
 }
+@apidoc[Source.aggregateWithBoundary](Source) { 
scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]"
 
java="#aggregateWithBoundary(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"}
+@apidoc[Flow.aggregateWithBoundary](Flow) { 
scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]"
 
java="#aggregateWithBoundary(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"
 }
 
 
 ## Description
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
index 75f7fccd2e..f76efe39a0 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
@@ -6,8 +6,8 @@ Delay every element passed through with a duration that can be 
controlled dynami
 
 ## Signature
 
-@apidoc[Source.delayWith](Source) { 
scala="#delayWith(delayStrategySupplier:()=&gt;org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]"
 
java="#delayWith(java.util.function.Supplier,org.apache.pekko.stream.DelayOverflowStrategy)"
 }
-@apidoc[Flow.delayWith](Flow) { 
scala="#delayWith(delayStrategySupplier:()=&gt;org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]"
 
java="#delayWith(java.util.function.Supplier,org.apache.pekko.stream.DelayOverflowStrategy)"
 }
+@apidoc[Source.delayWith](Source) { 
scala="#delayWith(delayStrategySupplier:()=&gt;org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]"
 
java="#delayWith(org.apache.pekko.japi.function.Creator,org.apache.pekko.stream.DelayOverflowStrategy)"
 }
+@apidoc[Flow.delayWith](Flow) { 
scala="#delayWith(delayStrategySupplier:()=&gt;org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]"
 
java="#delayWith(org.apache.pekko.japi.function.Creator,org.apache.pekko.stream.DelayOverflowStrategy)"
 }
 
 
 ## Description
diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md
index 9fe8b8b740..a988bdb5f0 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md
@@ -6,8 +6,8 @@ Defer the creation of a `Source/Flow` until materialization and 
access `Material
 
 ## Signature
 
-@apidoc[Source.fromMaterializer](Source$) { 
scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=&gt;org.apache.pekko.stream.scaladsl.Source[T,M]):org.apache.pekko.stream.scaladsl.Source[T,scala.concurrent.Future[M]]"
 java="#fromMaterializer(java.util.function.BiFunction)" }
-@apidoc[Flow.fromMaterializer](Flow$) { 
scala="#fromMaterializer[T,U,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=&gt;org.apache.pekko.stream.scaladsl.Flow[T,U,M]):org.apache.pekko.stream.scaladsl.Flow[T,U,scala.concurrent.Future[M]]"
 java="#fromMaterializer(java.util.function.BiFunction)" }
+@apidoc[Source.fromMaterializer](Source$) { 
scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=&gt;org.apache.pekko.stream.scaladsl.Source[T,M]):org.apache.pekko.stream.scaladsl.Source[T,scala.concurrent.Future[M]]"
 java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
+@apidoc[Flow.fromMaterializer](Flow$) { 
scala="#fromMaterializer[T,U,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=&gt;org.apache.pekko.stream.scaladsl.Flow[T,U,M]):org.apache.pekko.stream.scaladsl.Flow[T,U,scala.concurrent.Future[M]]"
 java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
 
 
 ## Description
diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
index 0772ce4b43..66d43d8d8a 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
@@ -6,9 +6,9 @@ Allows completing the stream when an upstream error occurs.
 
 ## Signature
 
-@apidoc[Source.onErrorComplete](Source) { 
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(java.util.function.Predicate)" }
+@apidoc[Source.onErrorComplete](Source) { 
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(org.apache.pekko.japi.function.Predicate)" }
 @apidoc[Source.onErrorComplete](Source) { 
scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(java.lang.Class)" }
-@apidoc[Flow.onErrorComplete](Flow) { 
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(java.util.function.Predicate)" }
+@apidoc[Flow.onErrorComplete](Flow) { 
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(org.apache.pekko.japi.function.Predicate)" }
 @apidoc[Flow.onErrorComplete](Flow) { 
scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorComplete(java.lang.Class)" }
 
 ## Description
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md
index 7aac41bcfc..e4ff749b01 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md
@@ -6,8 +6,8 @@ Allow sending of one last element downstream when a failure has 
happened upstrea
 
 ## Signature
 
-@apidoc[Source.recover](Source) { 
scala="#recover[T&gt;:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
 java="#recover(scala.PartialFunction)" 
java="#recover(java.lang.Class,java.util.function.Supplier)" }
-@apidoc[Flow.recover](Flow) { 
scala="#recover[T&gt;:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
 java="#recover(scala.PartialFunction)" 
java="#recover(java.lang.Class,java.util.function.Supplier)" }
+@apidoc[Source.recover](Source) { 
scala="#recover[T&gt;:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
 java="#recover(scala.PartialFunction)" 
java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
+@apidoc[Flow.recover](Flow) { 
scala="#recover[T&gt;:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
 java="#recover(scala.PartialFunction)" 
java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
 
 
 ## Description
diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md
index 4d62c22c05..b1e78b3d22 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md
@@ -6,8 +6,8 @@ Allow switching to alternative Source when a failure has 
happened upstream.
 
 ## Signature
 
-@apidoc[Source.recoverWith](Source) { 
scala="#recoverWith[T&gt;:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 java="#recoverWith(java.lang.Class,java.util.function.Supplier)" }
-@apidoc[Flow.recoverWith](Flow) { 
scala="#recoverWith[T&gt;:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 java="#recoverWith(java.lang.Class,java.util.function.Supplier)" }
+@apidoc[Source.recoverWith](Source) { 
scala="#recoverWith[T&gt;:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 java="#recoverWith(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
+@apidoc[Flow.recoverWith](Flow) { 
scala="#recoverWith[T&gt;:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 java="#recoverWith(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
 
 
 ## Description
diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md
index 666794a1ee..f7631f0a7c 100644
--- 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md
+++ 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md
@@ -6,8 +6,8 @@ RecoverWithRetries allows to switch to alternative Source on 
flow failure.
 
 ## Signature
 
-@apidoc[Source.recoverWithRetries](Source) { 
scala="#recoverWithRetries[T&gt;:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)" }
-@apidoc[Flow.recoverWithRetries](Flow) { 
scala="#recoverWithRetries[T&gt;:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)" }
+@apidoc[Source.recoverWithRetries](Source) { 
scala="#recoverWithRetries[T&gt;:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 
java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"
 }
+@apidoc[Flow.recoverWithRetries](Flow) { 
scala="#recoverWithRetries[T&gt;:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 
java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"
 }
 
 
 ## Description
diff --git a/docs/src/main/paradox/stream/stream-dynamic.md 
b/docs/src/main/paradox/stream/stream-dynamic.md
index 18a3c24f06..5ad915c636 100644
--- a/docs/src/main/paradox/stream/stream-dynamic.md
+++ b/docs/src/main/paradox/stream/stream-dynamic.md
@@ -219,7 +219,7 @@ It is possible to define how many initial consumers that 
are required before it
 to the attached consumers. While not enough consumers have been attached 
messages are buffered and when the
 buffer is full the upstream producer is backpressured. No messages are dropped.
 
-The above example illustrate a stateless partition function. For more advanced 
stateful routing the 
@java[@javadoc[ofStateful](pekko.stream.javadsl.PartitionHub$#ofStateful(java.lang.Class,java.util.function.Supplier,int))]
+The above example illustrate a stateless partition function. For more advanced 
stateful routing the 
@java[@javadoc[ofStateful](pekko.stream.javadsl.PartitionHub$#ofStateful(java.lang.Class,org.apache.pekko.japi.function.Creator,int))]
 
@scala[@scaladoc[statefulSink](pekko.stream.scaladsl.PartitionHub$#statefulSink[T](partitioner:()=%3E(org.apache.pekko.stream.scaladsl.PartitionHub.ConsumerInfo,T)=%3ELong,startAfterNrOfConsumers:Int,bufferSize:Int):org.apache.pekko.stream.scaladsl.Sink[T,org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]])]
 can be used. Here is an example of a stateful round-robin function:
 
 Scala
diff --git a/docs/src/main/paradox/stream/stream-error.md 
b/docs/src/main/paradox/stream/stream-error.md
index 17001855fa..dd1f13f2a7 100644
--- a/docs/src/main/paradox/stream/stream-error.md
+++ b/docs/src/main/paradox/stream/stream-error.md
@@ -20,8 +20,8 @@ Each of the operators downstream gets informed about the 
failure and each upstre
 
 In many cases you may want to avoid complete stream failure, this can be done 
in a few different ways:
 
- * @apidoc[recover](stream.*.Source) 
{scala="#recover[T&gt;:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
 java="#recover(java.lang.Class,java.util.function.Supplier)"} to emit a final 
element then complete the stream normally on upstream failure 
- * @apidoc[recoverWithRetries](stream.*.Source) 
{scala="#recoverWithRetries[T&gt;:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"} 
to create a new upstream and start consuming from that on failure
+ * @apidoc[recover](stream.*.Source) 
{scala="#recover[T&gt;:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
 java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)"} to 
emit a final element then complete the stream normally on upstream failure 
+ * @apidoc[recoverWithRetries](stream.*.Source) 
{scala="#recoverWithRetries[T&gt;:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 
java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"}
 to create a new upstream and start consuming from that on failure
  * Restarting sections of the stream after a backoff
  * Using a supervision strategy for operators that support it
  
@@ -52,7 +52,7 @@ in @ref:[Logging in 
streams](stream-cookbook.md#logging-in-streams).
 
 ## Recover
 
-@apidoc[recover](stream.*.Source) 
{scala="#recover[T&gt;:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
 java="#recover(java.lang.Class,java.util.function.Supplier)"} allows you to 
emit a final element and then complete the stream on an upstream failure.
+@apidoc[recover](stream.*.Source) 
{scala="#recover[T&gt;:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
 java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)"} 
allows you to emit a final element and then complete the stream on an upstream 
failure.
 Deciding which exceptions should be recovered is done through a 
@scaladoc[PartialFunction](scala.PartialFunction). If an exception
 does not have a @scala[matching case] @java[match defined] the stream is 
failed. 
 
@@ -80,7 +80,7 @@ Java
 
 ## Recover with retries
 
-@apidoc[recoverWithRetries](stream.*.Source) 
{scala="#recoverWithRetries[T&gt;:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"} 
allows you to put a new upstream in place of the failed one, recovering 
+@apidoc[recoverWithRetries](stream.*.Source) 
{scala="#recoverWithRetries[T&gt;:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
 
java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"}
 allows you to put a new upstream in place of the failed one, recovering 
 stream failures up to a specified maximum number of times. 
 
 Deciding which exceptions should be recovered is done through a 
@scaladoc[PartialFunction](scala.PartialFunction). If an exception
diff --git 
a/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
 
b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
new file mode 100644
index 0000000000..f412fc2a45
--- /dev/null
+++ 
b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Migrate java.util.function.* to pekko.japi.function.*
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatusAndContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatus")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.ask")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatus")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatusAndContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorSource.actorRef")
diff --git 
a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala
 
b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala
index e683830b0b..8873facb45 100644
--- 
a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala
+++ 
b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala
@@ -13,12 +13,12 @@
 
 package org.apache.pekko.stream.typed.javadsl
 
-import java.util.function.BiFunction
 import scala.concurrent.duration._
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.actor.typed.ActorRef
 import pekko.japi.Pair
+import pekko.japi.function
 import pekko.pattern.StatusReply
 import pekko.stream.javadsl.Flow
 import pekko.util.JavaDurationConverters
@@ -68,7 +68,7 @@ object ActorFlow {
   def ask[I, Q, A](
       ref: ActorRef[Q],
       timeout: java.time.Duration,
-      makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[I, A, NotUsed] =
+      makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[I, A, NotUsed] 
=
     org.apache.pekko.stream.typed.scaladsl.ActorFlow
       .ask[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))(
         JavaDurationConverters.asFiniteDuration(timeout))
@@ -82,7 +82,7 @@ object ActorFlow {
   def askWithStatus[I, Q, A](
       ref: ActorRef[Q],
       timeout: java.time.Duration,
-      makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, 
NotUsed] =
+      makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): 
Flow[I, A, NotUsed] =
     org.apache.pekko.stream.typed.scaladsl.ActorFlow
       .askWithStatus[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, 
ref))(
         JavaDurationConverters.asFiniteDuration(timeout))
@@ -139,7 +139,7 @@ object ActorFlow {
       parallelism: Int,
       ref: ActorRef[Q],
       timeout: java.time.Duration,
-      makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, 
NotUsed] =
+      makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): 
Flow[I, A, NotUsed] =
     org.apache.pekko.stream.typed.scaladsl.ActorFlow
       .askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, 
ref))(timeout.toMillis.millis)
       .asJava
@@ -150,7 +150,7 @@ object ActorFlow {
   def askWithContext[I, Q, A, Ctx](
       ref: ActorRef[Q],
       timeout: java.time.Duration,
-      makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, 
Ctx], NotUsed] =
+      makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], 
Pair[A, Ctx], NotUsed] =
     org.apache.pekko.stream.scaladsl
       .Flow[Pair[I, Ctx]]
       .map(_.toScala)
@@ -169,7 +169,7 @@ object ActorFlow {
   def askWithStatusAndContext[I, Q, A, Ctx](
       ref: ActorRef[Q],
       timeout: java.time.Duration,
-      makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[Pair[I, 
Ctx], Pair[A, Ctx], NotUsed] =
+      makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): 
Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] =
     org.apache.pekko.stream.scaladsl
       .Flow[Pair[I, Ctx]]
       .map(_.toScala)
@@ -187,7 +187,7 @@ object ActorFlow {
       parallelism: Int,
       ref: ActorRef[Q],
       timeout: java.time.Duration,
-      makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, 
Ctx], NotUsed] = {
+      makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], 
Pair[A, Ctx], NotUsed] = {
     org.apache.pekko.stream.scaladsl
       .Flow[Pair[I, Ctx]]
       .map(_.toScala)
diff --git 
a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
 
b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
index 5e31b916e6..e7da04a0d0 100644
--- 
a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
+++ 
b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
@@ -13,11 +13,10 @@
 
 package org.apache.pekko.stream.typed.javadsl
 
-import java.util.function.Predicate
-
 import org.apache.pekko
 import pekko.actor.typed._
 import pekko.japi.JavaPartialFunction
+import pekko.japi.function
 import pekko.stream.{ CompletionStrategy, OverflowStrategy }
 import pekko.stream.javadsl._
 
@@ -58,7 +57,7 @@ object ActorSource {
    * @param overflowStrategy Strategy that is used when incoming elements 
cannot fit inside the buffer
    */
   def actorRef[T](
-      completionMatcher: Predicate[T],
+      completionMatcher: function.Predicate[T],
       failureMatcher: pekko.japi.function.Function[T, 
java.util.Optional[Throwable]],
       bufferSize: Int,
       overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {
diff --git 
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
index 3d671dfd72..11ab37d791 100644
--- 
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
+++ 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
@@ -18,3 +18,30 @@
 # Migrate functions in japi.* to japi.function.*
 
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.JavaFlowSupport#Flow.fromProcessor")
 
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.JavaFlowSupport#Flow.fromProcessorMat")
+
+# Migrate java.util.function.* to pekko.japi.function.*
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.RestartSettings.withRestartOn")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.groupedWeighted")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.delayWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recover")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recoverWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recoverWithRetries")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.aggregateWithBoundary")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.fromMaterializer")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.PartitionHub.of")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.PartitionHub.ofStateful")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Sink.fromMaterializer")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recover")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recoverWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recoverWithRetries")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.groupedWeighted")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.delayWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.aggregateWithBoundary")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.fromMaterializer")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubFlow.delayWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubFlow.aggregateWithBoundary")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubSource.delayWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubSource.aggregateWithBoundary")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.TLS.create")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Tcp.outgoingConnectionWithTls")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Tcp.bindWithTls")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala 
b/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala
index e7fb56b61a..8fda40de43 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala
@@ -16,10 +16,10 @@ package org.apache.pekko.stream
 import scala.concurrent.duration.FiniteDuration
 
 import org.apache.pekko
+import pekko.japi.function
 import pekko.event.Logging
 import pekko.event.Logging.LogLevel
 import pekko.util.ConstantFun
-import pekko.util.FunctionConverters._
 import pekko.util.JavaDurationConverters._
 
 final class RestartSettings private (
@@ -58,8 +58,8 @@ final class RestartSettings private (
     copy(maxRestarts = count, maxRestartsWithin = within.asScala)
 
   /** Decides whether the failure should restart the stream or make the 
surrounding stream fail */
-  def withRestartOn(restartOn: java.util.function.Predicate[Throwable]): 
RestartSettings =
-    copy(restartOn = restartOn.asScala)
+  def withRestartOn(restartOn: function.Predicate[Throwable]): RestartSettings 
=
+    copy(restartOn = t => restartOn.test(t))
 
   def withLogSettings(newLogSettings: RestartSettings.LogSettings): 
RestartSettings =
     copy(logSettings = newLogSettings)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 637bf91541..6e76d89579 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -16,8 +16,6 @@ package org.apache.pekko.stream.javadsl
 import java.util.Comparator
 import java.util.Optional
 import java.util.concurrent.CompletionStage
-import java.util.function.BiFunction
-import java.util.function.Supplier
 
 import scala.annotation.varargs
 import scala.annotation.unchecked.uncheckedVariance
@@ -112,7 +110,7 @@ object Flow {
    * [[Attributes]] of the [[Flow]] returned by this method.
    */
   def fromMaterializer[I, O, M](
-      factory: BiFunction[Materializer, Attributes, Flow[I, O, M]]): Flow[I, 
O, CompletionStage[M]] =
+      factory: function.Function2[Materializer, Attributes, Flow[I, O, M]]): 
Flow[I, O, CompletionStage[M]] =
     scaladsl.Flow.fromMaterializer((mat, attr) => factory(mat, 
attr).asScala).mapMaterializedValue(_.asJava).asJava
 
   /**
@@ -1273,7 +1271,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * '''Cancels when''' downstream cancels
    */
   def groupedWeighted(minWeight: Long,
-      costFn: java.util.function.Function[Out, java.lang.Long]): 
javadsl.Flow[In, java.util.List[Out], Mat] =
+      costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, 
java.util.List[Out], Mat] =
     new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) 
// TODO optimize to one step
 
   /**
@@ -1316,7 +1314,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    */
   def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
       maxWeight: Long,
-      costFn: java.util.function.Function[Out, java.lang.Long])
+      costFn: function.Function[Out, java.lang.Long])
       : javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
     new Flow(delegate.groupedAdjacentByWeighted(f.apply, 
maxWeight)(costFn.apply).map(_.asJava))
 
@@ -1740,13 +1738,13 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    *
-   * @param delayStrategySupplier creates new [[DelayStrategy]] object for 
each materialization
+   * @param delayStrategyCreator creates new [[DelayStrategy]] object for each 
materialization
    * @param overFlowStrategy Strategy that is used when incoming elements 
cannot fit inside the buffer
    */
   def delayWith(
-      delayStrategySupplier: Supplier[DelayStrategy[Out]],
+      delayStrategyCreator: function.Creator[DelayStrategy[Out]],
       overFlowStrategy: DelayOverflowStrategy): Flow[In, Out, Mat] =
-    new Flow(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+    new Flow(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
 
   /**
    * Discard the given number of elements at the beginning of the stream.
@@ -1892,9 +1890,9 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
-  def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): 
javadsl.Flow[In, Out, Mat] =
+  def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): 
javadsl.Flow[In, Out, Mat] =
     recover {
-      case elem if clazz.isInstance(elem) => supplier.get()
+      case elem if clazz.isInstance(elem) => creator.create()
     }
 
   /**
@@ -1984,9 +1982,9 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    */
   def recoverWith(
       clazz: Class[_ <: Throwable],
-      supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, 
Out, Mat] =
+      creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): 
javadsl.Flow[In, Out, Mat] =
     recoverWith({
-      case elem if clazz.isInstance(elem) => supplier.get()
+      case elem if clazz.isInstance(elem) => creator.create()
     }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
 
   /**
@@ -2043,15 +2041,15 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * @param attempts Maximum number of retries or -1 to retry indefinitely
    * @param clazz the class object of the failure cause
-   * @param supplier supply the new Source to be materialized
+   * @param creator supply the new Source to be materialized
    */
   def recoverWithRetries(
       attempts: Int,
       clazz: Class[_ <: Throwable],
-      supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, 
Out, Mat] =
+      creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): 
javadsl.Flow[In, Out, Mat] =
     recoverWithRetries(attempts,
       {
-        case elem if clazz.isInstance(elem) => supplier.get()
+        case elem if clazz.isInstance(elem) => creator.create()
       })
 
   /**
@@ -2104,7 +2102,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * '''Cancels when''' downstream cancels
    *  @since 1.1.0
    */
-  def onErrorComplete(predicate: java.util.function.Predicate[_ >: 
Throwable]): javadsl.Flow[In, Out, Mat] =
+  def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): 
javadsl.Flow[In, Out, Mat] =
     new Flow(delegate.onErrorComplete {
       case ex: Throwable if predicate.test(ex) => true
     })
@@ -4308,13 +4306,13 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
   @deprecated("Use the overloaded one which accepts an Optional instead.", 
since = "1.2.0")
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
+  def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
-      emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
+      emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
       : javadsl.Flow[In, Emit, Mat] = {
     asScala
-      .aggregateWithBoundary(() => allocate.get())(
+      .aggregateWithBoundary(() => allocate.create())(
         aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
         harvest = agg => harvest.apply(agg),
         emitOnTimer = Option(emitOnTimer).map {
@@ -4341,14 +4339,14 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * @param harvest     this is invoked before emit within the current 
stage/operator
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
+  def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
-      emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], 
java.time.Duration]])
+      emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
       : javadsl.Flow[In, Emit, Mat] = {
     import org.apache.pekko.util.OptionConverters._
     asScala
-      .aggregateWithBoundary(() => allocate.get())(
+      .aggregateWithBoundary(() => allocate.create())(
         aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
         harvest = agg => harvest.apply(agg),
         emitOnTimer = emitOnTimer.toScala.map {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala
index be492a1dc9..3a3f34ce56 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala
@@ -13,9 +13,10 @@
 
 package org.apache.pekko.stream.javadsl
 
-import java.util.function.{ BiFunction, Supplier, ToLongBiFunction }
+import java.util.function.ToLongBiFunction
 
 import org.apache.pekko
+import pekko.japi.function
 import pekko.NotUsed
 import pekko.annotation.DoNotInherit
 import pekko.util.unused
@@ -259,11 +260,11 @@ object PartitionHub {
    */
   def ofStateful[T](
       @unused clazz: Class[T],
-      partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]],
+      partitioner: function.Creator[ToLongBiFunction[ConsumerInfo, T]],
       startAfterNrOfConsumers: Int,
       bufferSize: Int): Sink[T, Source[T, NotUsed]] = {
     val p: () => (pekko.stream.scaladsl.PartitionHub.ConsumerInfo, T) => Long 
= () => {
-      val f = partitioner.get()
+      val f = partitioner.create()
       (info, elem) => f.applyAsLong(info, elem)
     }
     pekko.stream.scaladsl.PartitionHub
@@ -303,7 +304,7 @@ object PartitionHub {
    */
   def ofStateful[T](
       clazz: Class[T],
-      partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]],
+      partitioner: function.Creator[ToLongBiFunction[ConsumerInfo, T]],
       startAfterNrOfConsumers: Int): Sink[T, Source[T, NotUsed]] =
     ofStateful(clazz, partitioner, startAfterNrOfConsumers, 
pekko.stream.scaladsl.PartitionHub.defaultBufferSize)
 
@@ -338,7 +339,7 @@ object PartitionHub {
    */
   def of[T](
       @unused clazz: Class[T],
-      partitioner: BiFunction[Integer, T, Integer],
+      partitioner: function.Function2[Integer, T, Integer],
       startAfterNrOfConsumers: Int,
       bufferSize: Int): Sink[T, Source[T, NotUsed]] =
     pekko.stream.scaladsl.PartitionHub
@@ -377,7 +378,7 @@ object PartitionHub {
    */
   def of[T](
       clazz: Class[T],
-      partitioner: BiFunction[Integer, T, Integer],
+      partitioner: function.Function2[Integer, T, Integer],
       startAfterNrOfConsumers: Int): Sink[T, Source[T, NotUsed]] =
     of(clazz, partitioner, startAfterNrOfConsumers, 
pekko.stream.scaladsl.PartitionHub.defaultBufferSize)
 
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index e5f4bf7fcd..76db0ef98c 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
 
 import java.util.Optional
 import java.util.concurrent.{ CompletableFuture, CompletionStage }
-import java.util.function.BiFunction
 import java.util.stream.Collector
 
 import scala.annotation.unchecked.uncheckedVariance
@@ -391,7 +390,8 @@ object Sink {
    * exposes [[Materializer]] which is going to be used during materialization 
and
    * [[Attributes]] of the [[Sink]] returned by this method.
    */
-  def fromMaterializer[T, M](factory: BiFunction[Materializer, Attributes, 
Sink[T, M]]): Sink[T, CompletionStage[M]] =
+  def fromMaterializer[T, M](
+      factory: function.Function2[Materializer, Attributes, Sink[T, M]]): 
Sink[T, CompletionStage[M]] =
     scaladsl.Sink.fromMaterializer((mat, attr) => factory(mat, 
attr).asScala).mapMaterializedValue(_.asJava).asJava
 
   /**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 3649a7ef44..96e5f852c7 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -16,7 +16,6 @@ package org.apache.pekko.stream.javadsl
 import java.util
 import java.util.Optional
 import java.util.concurrent.{ CompletableFuture, CompletionStage }
-import java.util.function.{ BiFunction, Supplier }
 
 import scala.annotation.varargs
 import scala.annotation.unchecked.uncheckedVariance
@@ -507,7 +506,7 @@ object Source {
    * [[Attributes]] of the [[Source]] returned by this method.
    */
   def fromMaterializer[T, M](
-      factory: BiFunction[Materializer, Attributes, Source[T, M]]): Source[T, 
CompletionStage[M]] =
+      factory: function.Function2[Materializer, Attributes, Source[T, M]]): 
Source[T, CompletionStage[M]] =
     scaladsl.Source.fromMaterializer((mat, attr) => factory(mat, 
attr).asScala).mapMaterializedValue(_.asJava).asJava
 
   /**
@@ -2129,9 +2128,9 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): 
javadsl.Source[Out, Mat] =
+  def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): 
javadsl.Source[Out, Mat] =
     recover {
-      case elem if clazz.isInstance(elem) => supplier.get()
+      case elem if clazz.isInstance(elem) => creator.create()
     }
 
   /**
@@ -2221,9 +2220,9 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    */
   def recoverWith(
       clazz: Class[_ <: Throwable],
-      supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
+      creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): 
Source[Out, Mat] =
     recoverWith({
-      case elem if clazz.isInstance(elem) => supplier.get()
+      case elem if clazz.isInstance(elem) => creator.create()
     }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
 
   /**
@@ -2277,15 +2276,15 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * @param attempts Maximum number of retries or -1 to retry indefinitely
    * @param clazz the class object of the failure cause
-   * @param supplier supply the new Source to be materialized
+   * @param creator create the new Source to be materialized
    */
   def recoverWithRetries(
       attempts: Int,
       clazz: Class[_ <: Throwable],
-      supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
+      creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): 
Source[Out, Mat] =
     recoverWithRetries(attempts,
       {
-        case elem if clazz.isInstance(elem) => supplier.get()
+        case elem if clazz.isInstance(elem) => creator.create()
       }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
 
   /**
@@ -2338,7 +2337,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * '''Cancels when''' downstream cancels
    *  @since 1.1.0
    */
-  def onErrorComplete(predicate: java.util.function.Predicate[_ >: 
Throwable]): javadsl.Source[Out, Mat] =
+  def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): 
javadsl.Source[Out, Mat] =
     new Source(delegate.onErrorComplete {
       case ex: Throwable if predicate.test(ex) => true
     })
@@ -2988,7 +2987,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def groupedWeighted(minWeight: Long, costFn: 
java.util.function.Function[Out, java.lang.Long])
+  def groupedWeighted(minWeight: Long, costFn: function.Function[Out, 
java.lang.Long])
       : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
     new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
 
@@ -3031,7 +3030,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    */
   def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
       maxWeight: Long,
-      costFn: java.util.function.Function[Out, java.lang.Long])
+      costFn: function.Function[Out, java.lang.Long])
       : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
     new Source(delegate.groupedAdjacentByWeighted(f.apply, 
maxWeight)(costFn.apply).map(_.asJava))
 
@@ -3444,13 +3443,13 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    *
-   * @param delayStrategySupplier creates new [[DelayStrategy]] object for 
each materialization
+   * @param delayStrategyCreator creates new [[DelayStrategy]] object for each 
materialization
    * @param overFlowStrategy Strategy that is used when incoming elements 
cannot fit inside the buffer
    */
   def delayWith(
-      delayStrategySupplier: Supplier[DelayStrategy[Out]],
+      delayStrategyCreator: function.Creator[DelayStrategy[Out]],
       overFlowStrategy: DelayOverflowStrategy): Source[Out, Mat] =
-    new Source(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+    new Source(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
 
   /**
    * Discard the given number of elements at the beginning of the stream.
@@ -4749,12 +4748,12 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
   @deprecated("Use the overloaded one which accepts an Optional instead.", 
since = "1.2.0")
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
+  def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
-      emitOnTimer: Pair[java.util.function.Predicate[Agg], 
java.time.Duration]): javadsl.Source[Emit, Mat] = {
+      emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration]): 
javadsl.Source[Emit, Mat] = {
     asScala
-      .aggregateWithBoundary(() => allocate.get())(
+      .aggregateWithBoundary(() => allocate.create())(
         aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
         harvest = agg => harvest.apply(agg),
         emitOnTimer = Option(emitOnTimer).map {
@@ -4781,13 +4780,13 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * @param harvest     this is invoked before emit within the current 
stage/operator
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
+  def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
-      emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], 
java.time.Duration]]): javadsl.Source[Emit, Mat] = {
+      emitOnTimer: Optional[Pair[function.Predicate[Agg], 
java.time.Duration]]): javadsl.Source[Emit, Mat] = {
     import org.apache.pekko.util.OptionConverters._
     asScala
-      .aggregateWithBoundary(() => allocate.get())(
+      .aggregateWithBoundary(() => allocate.create())(
         aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
         harvest = agg => harvest.apply(agg),
         emitOnTimer = emitOnTimer.toScala.map {
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index e4417dc676..947432a6f1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
 
 import java.util.{ Comparator, Optional }
 import java.util.concurrent.CompletionStage
-import java.util.function.Supplier
 
 import scala.annotation.varargs
 import scala.annotation.unchecked.uncheckedVariance
@@ -717,7 +716,7 @@ final class SubFlow[In, Out, Mat](
    */
   def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
       maxWeight: Long,
-      costFn: java.util.function.Function[Out, java.lang.Long])
+      costFn: function.Function[Out, java.lang.Long])
       : SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
     new SubFlow(delegate.groupedAdjacentByWeighted(f.apply, 
maxWeight)(costFn.apply).map(_.asJava))
 
@@ -1134,13 +1133,13 @@ final class SubFlow[In, Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    *
-   * @param delayStrategySupplier creates new [[DelayStrategy]] object for 
each materialization
+   * @param delayStrategyCreator creates new [[DelayStrategy]] object for each 
materialization
    * @param overFlowStrategy Strategy that is used when incoming elements 
cannot fit inside the buffer
    */
   def delayWith(
-      delayStrategySupplier: Supplier[DelayStrategy[Out]],
+      delayStrategyCreator: function.Creator[DelayStrategy[Out]],
       overFlowStrategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] =
-    new SubFlow(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+    new SubFlow(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
 
   /**
    * Discard the given number of elements at the beginning of the stream.
@@ -1367,7 +1366,7 @@ final class SubFlow[In, Out, Mat](
    * '''Cancels when''' downstream cancels
    *  @since 1.1.0
    */
-  def onErrorComplete(predicate: java.util.function.Predicate[_ >: 
Throwable]): SubFlow[In, Out, Mat] =
+  def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): 
SubFlow[In, Out, Mat] =
     new SubFlow(delegate.onErrorComplete {
       case ex: Throwable if predicate.test(ex) => true
     })
@@ -2790,13 +2789,13 @@ final class SubFlow[In, Out, Mat](
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
   @deprecated("Use the overloaded one which accepts an Optional instead.", 
since = "1.2.0")
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
+  def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
-      emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
+      emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
       : javadsl.SubFlow[In, Emit, Mat] = {
     new SubFlow(
-      asScala.aggregateWithBoundary(() => allocate.get())(
+      asScala.aggregateWithBoundary(() => allocate.create())(
         aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
         harvest = agg => harvest.apply(agg),
         emitOnTimer = Option(emitOnTimer).map {
@@ -2822,14 +2821,14 @@ final class SubFlow[In, Out, Mat](
    * @param harvest     this is invoked before emit within the current 
stage/operator
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
+  def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
-      emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], 
java.time.Duration]])
+      emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
       : javadsl.SubFlow[In, Emit, Mat] = {
     import org.apache.pekko.util.OptionConverters._
     new SubFlow(
-      asScala.aggregateWithBoundary(() => allocate.get())(
+      asScala.aggregateWithBoundary(() => allocate.create())(
         aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
         harvest = agg => harvest.apply(agg),
         emitOnTimer = emitOnTimer.toScala.map {
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index ed6b8a19dd..387c4f1b19 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
 
 import java.util.{ Comparator, Optional }
 import java.util.concurrent.CompletionStage
-import java.util.function.Supplier
 
 import scala.annotation.varargs
 import scala.annotation.unchecked.uncheckedVariance
@@ -707,7 +706,7 @@ final class SubSource[Out, Mat](
    */
   def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
       maxWeight: Long,
-      costFn: java.util.function.Function[Out, java.lang.Long])
+      costFn: function.Function[Out, java.lang.Long])
       : SubSource[java.util.List[Out @uncheckedVariance], Mat] =
     new SubSource(delegate.groupedAdjacentByWeighted(f.apply, 
maxWeight)(costFn.apply).map(_.asJava))
 
@@ -1226,13 +1225,13 @@ final class SubSource[Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    *
-   * @param delayStrategySupplier creates new [[DelayStrategy]] object for 
each materialization
+   * @param delayStrategyCreator creates new [[DelayStrategy]] object for each 
materialization
    * @param overFlowStrategy Strategy that is used when incoming elements 
cannot fit inside the buffer
    */
   def delayWith(
-      delayStrategySupplier: Supplier[DelayStrategy[Out]],
+      delayStrategyCreator: function.Creator[DelayStrategy[Out]],
       overFlowStrategy: DelayOverflowStrategy): SubSource[Out, Mat] =
-    new SubSource(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+    new SubSource(delegate.delayWith(() => 
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
 
   /**
    * Recover allows to send last element on failure and gracefully complete 
the stream
@@ -1344,7 +1343,7 @@ final class SubSource[Out, Mat](
    * '''Cancels when''' downstream cancels
    *  @since 1.1.0
    */
-  def onErrorComplete(predicate: java.util.function.Predicate[_ >: 
Throwable]): SubSource[Out, Mat] =
+  def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): 
SubSource[Out, Mat] =
     new SubSource(delegate.onErrorComplete {
       case ex: Throwable if predicate.test(ex) => true
     })
@@ -2764,13 +2763,13 @@ final class SubSource[Out, Mat](
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
   @deprecated("Use the overloaded one which accepts an Optional instead.", 
since = "1.2.0")
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
+  def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
-      emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
+      emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
       : javadsl.SubSource[Emit, Mat] = {
     new SubSource(
-      asScala.aggregateWithBoundary(() => allocate.get())(
+      asScala.aggregateWithBoundary(() => allocate.create())(
         aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
         harvest = agg => harvest.apply(agg),
         emitOnTimer = Option(emitOnTimer).map {
@@ -2796,14 +2795,14 @@ final class SubSource[Out, Mat](
    * @param harvest     this is invoked before emit within the current 
stage/operator
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
+  def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
-      emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], 
java.time.Duration]])
+      emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
       : javadsl.SubSource[Emit, Mat] = {
     import org.apache.pekko.util.OptionConverters._
     new SubSource(
-      asScala.aggregateWithBoundary(() => allocate.get())(
+      asScala.aggregateWithBoundary(() => allocate.create())(
         aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
         harvest = agg => harvest.apply(agg),
         emitOnTimer = emitOnTimer.toScala.map {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala
index 0955fba7ac..30747abdbe 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala
@@ -13,13 +13,13 @@
 
 package org.apache.pekko.stream.javadsl
 
-import java.util.function.{ Consumer, Supplier }
 import javax.net.ssl.{ SSLEngine, SSLSession }
 
 import scala.util.Try
 
 import org.apache.pekko
 import pekko.NotUsed
+import pekko.japi.function
 import pekko.stream._
 import pekko.stream.TLSProtocol._
 import pekko.util.ByteString
@@ -73,11 +73,11 @@ object TLS {
    * For a description of the `closing` parameter please refer to 
[[TLSClosing]].
    */
   def create(
-      sslEngineCreator: Supplier[SSLEngine],
-      sessionVerifier: Consumer[SSLSession],
+      sslEngineCreator: function.Creator[SSLEngine],
+      sessionVerifier: function.Procedure[SSLSession],
       closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, 
SslTlsInbound, NotUsed] =
     new javadsl.BidiFlow(
-      scaladsl.TLS.apply(() => sslEngineCreator.get(), session => 
Try(sessionVerifier.accept(session)), closing))
+      scaladsl.TLS.apply(() => sslEngineCreator.create(), session => 
Try(sessionVerifier(session)), closing))
 
   /**
    * Create a StreamTls [[pekko.stream.javadsl.BidiFlow]]. This is a low-level 
interface.
@@ -88,9 +88,9 @@ object TLS {
    * For a description of the `closing` parameter please refer to 
[[TLSClosing]].
    */
   def create(
-      sslEngineCreator: Supplier[SSLEngine],
+      sslEngineCreator: function.Creator[SSLEngine],
       closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, 
SslTlsInbound, NotUsed] =
-    new javadsl.BidiFlow(scaladsl.TLS.apply(() => sslEngineCreator.get(), 
closing))
+    new javadsl.BidiFlow(scaladsl.TLS.apply(() => sslEngineCreator.create(), 
closing))
 }
 
 /**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
index d34b1b51a4..7255a02e6a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
@@ -17,8 +17,6 @@ import java.lang.{ Iterable => JIterable }
 import java.net.InetSocketAddress
 import java.util.Optional
 import java.util.concurrent.CompletionStage
-import java.util.function.{ Function => JFunction }
-import java.util.function.Supplier
 import javax.net.ssl.SSLEngine
 import javax.net.ssl.SSLSession
 
@@ -35,6 +33,7 @@ import pekko.actor.ExtensionId
 import pekko.actor.ExtensionIdProvider
 import pekko.annotation.InternalApi
 import pekko.io.Inet.SocketOption
+import pekko.japi.function
 import pekko.stream.Materializer
 import pekko.stream.SystemMaterializer
 import pekko.stream.TLSClosing
@@ -261,10 +260,10 @@ class Tcp(system: ExtendedActorSystem) extends 
pekko.actor.Extension {
    */
   def outgoingConnectionWithTls(
       remoteAddress: InetSocketAddress,
-      createSSLEngine: Supplier[SSLEngine]): Flow[ByteString, ByteString, 
CompletionStage[OutgoingConnection]] =
+      createSSLEngine: function.Creator[SSLEngine]): Flow[ByteString, 
ByteString, CompletionStage[OutgoingConnection]] =
     Flow.fromGraph(
       delegate
-        .outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => 
createSSLEngine.get())
+        .outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => 
createSSLEngine.create())
         .mapMaterializedValue(_.map(new 
OutgoingConnection(_))(parasitic).asJava))
 
   /**
@@ -279,18 +278,18 @@ class Tcp(system: ExtendedActorSystem) extends 
pekko.actor.Extension {
    */
   def outgoingConnectionWithTls(
       remoteAddress: InetSocketAddress,
-      createSSLEngine: Supplier[SSLEngine],
+      createSSLEngine: function.Creator[SSLEngine],
       localAddress: Optional[InetSocketAddress],
       options: JIterable[SocketOption],
       connectTimeout: Optional[java.time.Duration],
       idleTimeout: Optional[java.time.Duration],
-      verifySession: JFunction[SSLSession, Optional[Throwable]],
+      verifySession: function.Function[SSLSession, Optional[Throwable]],
       closing: TLSClosing): Flow[ByteString, ByteString, 
CompletionStage[OutgoingConnection]] = {
     Flow.fromGraph(
       delegate
         .outgoingConnectionWithTls(
           remoteAddress,
-          createSSLEngine = () => createSSLEngine.get(),
+          createSSLEngine = () => createSSLEngine.create(),
           localAddress.toScala,
           CollectionUtil.toSeq(options),
           optionalDurationToScala(connectTimeout),
@@ -313,10 +312,10 @@ class Tcp(system: ExtendedActorSystem) extends 
pekko.actor.Extension {
   def bindWithTls(
       interface: String,
       port: Int,
-      createSSLEngine: Supplier[SSLEngine]): Source[IncomingConnection, 
CompletionStage[ServerBinding]] = {
+      createSSLEngine: function.Creator[SSLEngine]): 
Source[IncomingConnection, CompletionStage[ServerBinding]] = {
     Source.fromGraph(
       delegate
-        .bindWithTls(interface, port, createSSLEngine = () => 
createSSLEngine.get())
+        .bindWithTls(interface, port, createSSLEngine = () => 
createSSLEngine.create())
         .map(new IncomingConnection(_))
         .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava))
   }
@@ -330,18 +329,18 @@ class Tcp(system: ExtendedActorSystem) extends 
pekko.actor.Extension {
   def bindWithTls(
       interface: String,
       port: Int,
-      createSSLEngine: Supplier[SSLEngine],
+      createSSLEngine: function.Creator[SSLEngine],
       backlog: Int,
       options: JIterable[SocketOption],
       idleTimeout: Optional[java.time.Duration],
-      verifySession: JFunction[SSLSession, Optional[Throwable]],
+      verifySession: function.Function[SSLSession, Optional[Throwable]],
       closing: TLSClosing): Source[IncomingConnection, 
CompletionStage[ServerBinding]] = {
     Source.fromGraph(
       delegate
         .bindWithTls(
           interface,
           port,
-          createSSLEngine = () => createSSLEngine.get(),
+          createSSLEngine = () => createSSLEngine.create(),
           backlog,
           CollectionUtil.toSeq(options),
           optionalDurationToScala(idleTimeout),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to