This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 6f2a65da4c chore: Make use of japi.function in stream api. (#2143)
6f2a65da4c is described below
commit 6f2a65da4c34ae601880725ff5c3c087317f16cd
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri Sep 5 03:24:26 2025 +0800
chore: Make use of japi.function in stream api. (#2143)
---
.../org/apache/pekko/japi/function/Function.scala | 2 +-
.../main/paradox/stream/operators/ActorFlow/ask.md | 2 +-
.../stream/operators/ActorFlow/askWithContext.md | 2 +-
.../stream/operators/ActorFlow/askWithStatus.md | 4 +--
.../operators/ActorFlow/askWithStatusAndContext.md | 2 +-
.../stream/operators/ActorSource/actorRef.md | 2 +-
.../stream/operators/Sink/fromMaterializer.md | 2 +-
.../Source-or-Flow/aggregateWithBoundary.md | 4 +--
.../stream/operators/Source-or-Flow/delayWith.md | 4 +--
.../operators/Source-or-Flow/fromMaterializer.md | 4 +--
.../operators/Source-or-Flow/onErrorComplete.md | 4 +--
.../stream/operators/Source-or-Flow/recover.md | 4 +--
.../stream/operators/Source-or-Flow/recoverWith.md | 4 +--
.../operators/Source-or-Flow/recoverWithRetries.md | 4 +--
docs/src/main/paradox/stream/stream-dynamic.md | 2 +-
docs/src/main/paradox/stream/stream-error.md | 8 ++---
.../javaapi-functions.excludes | 28 +++++++++++++++
.../pekko/stream/typed/javadsl/ActorFlow.scala | 14 ++++----
.../pekko/stream/typed/javadsl/ActorSource.scala | 5 ++-
.../javaapi-functions.excludes | 27 ++++++++++++++
.../org/apache/pekko/stream/RestartSettings.scala | 6 ++--
.../org/apache/pekko/stream/javadsl/Flow.scala | 42 +++++++++++-----------
.../org/apache/pekko/stream/javadsl/Hub.scala | 13 +++----
.../org/apache/pekko/stream/javadsl/Sink.scala | 4 +--
.../org/apache/pekko/stream/javadsl/Source.scala | 41 +++++++++++----------
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 23 ++++++------
.../apache/pekko/stream/javadsl/SubSource.scala | 23 ++++++------
.../org/apache/pekko/stream/javadsl/TLS.scala | 12 +++----
.../org/apache/pekko/stream/javadsl/Tcp.scala | 23 ++++++------
29 files changed, 182 insertions(+), 133 deletions(-)
diff --git a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala
b/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala
index 109a4332d3..250c20dc20 100644
--- a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala
+++ b/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala
@@ -105,7 +105,7 @@ trait Predicate2[-T1, -T2] extends java.io.Serializable {
/**
* A constructor/factory, takes no parameters but creates a new value of type
T every call.
- * Supports throwing `Exception` in the apply, which the
`java.util.function.Supplier` counterpart does not.
+ * Supports throwing `Exception` in the create method, which the
`java.util.function.Supplier` counterpart does not.
*/
@nowarn("msg=@SerialVersionUID has no effect")
@SerialVersionUID(1L)
diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/ask.md
b/docs/src/main/paradox/stream/operators/ActorFlow/ask.md
index 7fe77db067..5c2c6e8113 100644
--- a/docs/src/main/paradox/stream/operators/ActorFlow/ask.md
+++ b/docs/src/main/paradox/stream/operators/ActorFlow/ask.md
@@ -19,7 +19,7 @@ This operator is included in:
## Signature
-@apidoc[ActorFlow.ask](ActorFlow$) {
scala="#ask%5BI,Q,A](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5BI,A,org.apache.pekko.NotUsed]"
java="#ask(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)"
}
+@apidoc[ActorFlow.ask](ActorFlow$) {
scala="#ask%5BI,Q,A](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5BI,A,org.apache.pekko.NotUsed]"
java="#ask(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,org.apache.pekko.japi.function.Function2)"
}
## Description
diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md
b/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md
index bf420e0586..6359a4213f 100644
--- a/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md
+++ b/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md
@@ -19,7 +19,7 @@ This operator is included in:
## Signature
-@apidoc[ActorFlow.askWithContext](ActorFlow$) {
scala="#askWithContext%5BI,Q,A,Ctx](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]"
java="#askWithContext(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)"
}
+@apidoc[ActorFlow.askWithContext](ActorFlow$) {
scala="#askWithContext%5BI,Q,A,Ctx](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]"
java="#askWithContext(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,org.apache.pekko.japi.function.Function2)"
}
## Description
diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md
b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md
index d23f3974b5..9721940cd0 100644
--- a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md
+++ b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md
@@ -18,8 +18,8 @@ This operator is included in:
## Signature
-@apidoc[ActorFlow.askWithStatus](ActorFlow$) {
scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]"
java
="#askWithStatus[I,Q,A](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.u
[...]
-@apidoc[ActorFlow.askWithStatus](ActorFlow$) {
scala="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]"
java
="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,org.apa
[...]
+@apidoc[ActorFlow.askWithStatus](ActorFlow$) {
scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]"
java
="#askWithStatus[I,Q,A](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.ap
[...]
+@apidoc[ActorFlow.askWithStatus](ActorFlow$) {
scala="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]"
java
="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.apache.pekko.japi.function.Function
[...]
## Description
diff --git
a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md
b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md
index de2e6d0312..4cb4ead595 100644
---
a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md
+++
b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md
@@ -19,7 +19,7 @@ This operator is included in:
## Signature
-@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) {
scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]"
java
="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRe
[...]
+@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) {
scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]"
java
="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRe
[...]
## Description
diff --git a/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md
b/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md
index 35851bfea8..71cc41b153 100644
--- a/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md
+++ b/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md
@@ -19,7 +19,7 @@ This operator is included in:
## Signature
-@apidoc[ActorSource.actorRef](ActorSource$) {
scala="#actorRef[T](completionMatcher:PartialFunction[T,Unit],failureMatcher:PartialFunction[T,Throwable],bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.actor.typed.ActorRef[T]]"
java="#actorRef(java.util.function.Predicate,org.apache.pekko.japi.function.Function,int,org.apache.pekko.stream.OverflowStrategy)"
}
+@apidoc[ActorSource.actorRef](ActorSource$) {
scala="#actorRef[T](completionMatcher:PartialFunction[T,Unit],failureMatcher:PartialFunction[T,Throwable],bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.actor.typed.ActorRef[T]]"
java="#actorRef(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function,int,org.apache.pekko.stream.OverflowStrategy)"
}
## Description
diff --git a/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md
b/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md
index 91cdef4746..c10b18b2a9 100644
--- a/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md
+++ b/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md
@@ -6,7 +6,7 @@ Defer the creation of a `Sink` until materialization and access
`Materializer` a
## Signature
-@apidoc[Sink.fromMaterializer](Sink$) {
scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Sink[T,M]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]"
java="#fromMaterializer(java.util.function.BiFunction)" }
+@apidoc[Sink.fromMaterializer](Sink$) {
scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Sink[T,M]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]"
java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
## Description
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
index d6e8959bdc..4c053db52a 100644
---
a/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
+++
b/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
@@ -8,8 +8,8 @@ Aggregate and emit until custom boundary condition met.
## Signature
-@apidoc[Source.aggregateWithBoundary](Source) {
scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]"
java="#aggregateWithBoundary(java.util.function.Supplier,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"}
-@apidoc[Flow.aggregateWithBoundary](Flow) {
scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]"
java="#aggregateWithBoundary(java.util.function.Supplier,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"
}
+@apidoc[Source.aggregateWithBoundary](Source) {
scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]"
java="#aggregateWithBoundary(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"}
+@apidoc[Flow.aggregateWithBoundary](Flow) {
scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]"
java="#aggregateWithBoundary(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"
}
## Description
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
index 75f7fccd2e..f76efe39a0 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
@@ -6,8 +6,8 @@ Delay every element passed through with a duration that can be
controlled dynami
## Signature
-@apidoc[Source.delayWith](Source) {
scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]"
java="#delayWith(java.util.function.Supplier,org.apache.pekko.stream.DelayOverflowStrategy)"
}
-@apidoc[Flow.delayWith](Flow) {
scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]"
java="#delayWith(java.util.function.Supplier,org.apache.pekko.stream.DelayOverflowStrategy)"
}
+@apidoc[Source.delayWith](Source) {
scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]"
java="#delayWith(org.apache.pekko.japi.function.Creator,org.apache.pekko.stream.DelayOverflowStrategy)"
}
+@apidoc[Flow.delayWith](Flow) {
scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]"
java="#delayWith(org.apache.pekko.japi.function.Creator,org.apache.pekko.stream.DelayOverflowStrategy)"
}
## Description
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md
index 9fe8b8b740..a988bdb5f0 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md
@@ -6,8 +6,8 @@ Defer the creation of a `Source/Flow` until materialization and
access `Material
## Signature
-@apidoc[Source.fromMaterializer](Source$) {
scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Source[T,M]):org.apache.pekko.stream.scaladsl.Source[T,scala.concurrent.Future[M]]"
java="#fromMaterializer(java.util.function.BiFunction)" }
-@apidoc[Flow.fromMaterializer](Flow$) {
scala="#fromMaterializer[T,U,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Flow[T,U,M]):org.apache.pekko.stream.scaladsl.Flow[T,U,scala.concurrent.Future[M]]"
java="#fromMaterializer(java.util.function.BiFunction)" }
+@apidoc[Source.fromMaterializer](Source$) {
scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Source[T,M]):org.apache.pekko.stream.scaladsl.Source[T,scala.concurrent.Future[M]]"
java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
+@apidoc[Flow.fromMaterializer](Flow$) {
scala="#fromMaterializer[T,U,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Flow[T,U,M]):org.apache.pekko.stream.scaladsl.Flow[T,U,scala.concurrent.Future[M]]"
java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
## Description
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
index 0772ce4b43..66d43d8d8a 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md
@@ -6,9 +6,9 @@ Allows completing the stream when an upstream error occurs.
## Signature
-@apidoc[Source.onErrorComplete](Source) {
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
java="#onErrorComplete(java.util.function.Predicate)" }
+@apidoc[Source.onErrorComplete](Source) {
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
java="#onErrorComplete(org.apache.pekko.japi.function.Predicate)" }
@apidoc[Source.onErrorComplete](Source) {
scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
java="#onErrorComplete(java.lang.Class)" }
-@apidoc[Flow.onErrorComplete](Flow) {
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
java="#onErrorComplete(java.util.function.Predicate)" }
+@apidoc[Flow.onErrorComplete](Flow) {
scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D"
java="#onErrorComplete(org.apache.pekko.japi.function.Predicate)" }
@apidoc[Flow.onErrorComplete](Flow) {
scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
java="#onErrorComplete(java.lang.Class)" }
## Description
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md
index 7aac41bcfc..e4ff749b01 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md
@@ -6,8 +6,8 @@ Allow sending of one last element downstream when a failure has
happened upstrea
## Signature
-@apidoc[Source.recover](Source) {
scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
java="#recover(scala.PartialFunction)"
java="#recover(java.lang.Class,java.util.function.Supplier)" }
-@apidoc[Flow.recover](Flow) {
scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
java="#recover(scala.PartialFunction)"
java="#recover(java.lang.Class,java.util.function.Supplier)" }
+@apidoc[Source.recover](Source) {
scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
java="#recover(scala.PartialFunction)"
java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
+@apidoc[Flow.recover](Flow) {
scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
java="#recover(scala.PartialFunction)"
java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
## Description
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md
index 4d62c22c05..b1e78b3d22 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md
@@ -6,8 +6,8 @@ Allow switching to alternative Source when a failure has
happened upstream.
## Signature
-@apidoc[Source.recoverWith](Source) {
scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWith(java.lang.Class,java.util.function.Supplier)" }
-@apidoc[Flow.recoverWith](Flow) {
scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWith(java.lang.Class,java.util.function.Supplier)" }
+@apidoc[Source.recoverWith](Source) {
scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWith(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
+@apidoc[Flow.recoverWith](Flow) {
scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWith(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
## Description
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md
index 666794a1ee..f7631f0a7c 100644
---
a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md
+++
b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md
@@ -6,8 +6,8 @@ RecoverWithRetries allows to switch to alternative Source on
flow failure.
## Signature
-@apidoc[Source.recoverWithRetries](Source) {
scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)" }
-@apidoc[Flow.recoverWithRetries](Flow) {
scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)" }
+@apidoc[Source.recoverWithRetries](Source) {
scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"
}
+@apidoc[Flow.recoverWithRetries](Flow) {
scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"
}
## Description
diff --git a/docs/src/main/paradox/stream/stream-dynamic.md
b/docs/src/main/paradox/stream/stream-dynamic.md
index 18a3c24f06..5ad915c636 100644
--- a/docs/src/main/paradox/stream/stream-dynamic.md
+++ b/docs/src/main/paradox/stream/stream-dynamic.md
@@ -219,7 +219,7 @@ It is possible to define how many initial consumers that
are required before it
to the attached consumers. While not enough consumers have been attached
messages are buffered and when the
buffer is full the upstream producer is backpressured. No messages are dropped.
-The above example illustrate a stateless partition function. For more advanced
stateful routing the
@java[@javadoc[ofStateful](pekko.stream.javadsl.PartitionHub$#ofStateful(java.lang.Class,java.util.function.Supplier,int))]
+The above example illustrate a stateless partition function. For more advanced
stateful routing the
@java[@javadoc[ofStateful](pekko.stream.javadsl.PartitionHub$#ofStateful(java.lang.Class,org.apache.pekko.japi.function.Creator,int))]
@scala[@scaladoc[statefulSink](pekko.stream.scaladsl.PartitionHub$#statefulSink[T](partitioner:()=%3E(org.apache.pekko.stream.scaladsl.PartitionHub.ConsumerInfo,T)=%3ELong,startAfterNrOfConsumers:Int,bufferSize:Int):org.apache.pekko.stream.scaladsl.Sink[T,org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]])]
can be used. Here is an example of a stateful round-robin function:
Scala
diff --git a/docs/src/main/paradox/stream/stream-error.md
b/docs/src/main/paradox/stream/stream-error.md
index 17001855fa..dd1f13f2a7 100644
--- a/docs/src/main/paradox/stream/stream-error.md
+++ b/docs/src/main/paradox/stream/stream-error.md
@@ -20,8 +20,8 @@ Each of the operators downstream gets informed about the
failure and each upstre
In many cases you may want to avoid complete stream failure, this can be done
in a few different ways:
- * @apidoc[recover](stream.*.Source)
{scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
java="#recover(java.lang.Class,java.util.function.Supplier)"} to emit a final
element then complete the stream normally on upstream failure
- * @apidoc[recoverWithRetries](stream.*.Source)
{scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"}
to create a new upstream and start consuming from that on failure
+ * @apidoc[recover](stream.*.Source)
{scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)"} to
emit a final element then complete the stream normally on upstream failure
+ * @apidoc[recoverWithRetries](stream.*.Source)
{scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"}
to create a new upstream and start consuming from that on failure
* Restarting sections of the stream after a backoff
* Using a supervision strategy for operators that support it
@@ -52,7 +52,7 @@ in @ref:[Logging in
streams](stream-cookbook.md#logging-in-streams).
## Recover
-@apidoc[recover](stream.*.Source)
{scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
java="#recover(java.lang.Class,java.util.function.Supplier)"} allows you to
emit a final element and then complete the stream on an upstream failure.
+@apidoc[recover](stream.*.Source)
{scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]"
java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)"}
allows you to emit a final element and then complete the stream on an upstream
failure.
Deciding which exceptions should be recovered is done through a
@scaladoc[PartialFunction](scala.PartialFunction). If an exception
does not have a @scala[matching case] @java[match defined] the stream is
failed.
@@ -80,7 +80,7 @@ Java
## Recover with retries
-@apidoc[recoverWithRetries](stream.*.Source)
{scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"}
allows you to put a new upstream in place of the failed one, recovering
+@apidoc[recoverWithRetries](stream.*.Source)
{scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]"
java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"}
allows you to put a new upstream in place of the failed one, recovering
stream failures up to a specified maximum number of times.
Deciding which exceptions should be recovered is done through a
@scaladoc[PartialFunction](scala.PartialFunction). If an exception
diff --git
a/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
new file mode 100644
index 0000000000..f412fc2a45
--- /dev/null
+++
b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Migrate java.util.function.* to pekko.japi.function.*
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatusAndContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatus")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.ask")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatus")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatusAndContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorSource.actorRef")
diff --git
a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala
b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala
index e683830b0b..8873facb45 100644
---
a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala
+++
b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala
@@ -13,12 +13,12 @@
package org.apache.pekko.stream.typed.javadsl
-import java.util.function.BiFunction
import scala.concurrent.duration._
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.typed.ActorRef
import pekko.japi.Pair
+import pekko.japi.function
import pekko.pattern.StatusReply
import pekko.stream.javadsl.Flow
import pekko.util.JavaDurationConverters
@@ -68,7 +68,7 @@ object ActorFlow {
def ask[I, Q, A](
ref: ActorRef[Q],
timeout: java.time.Duration,
- makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[I, A, NotUsed] =
+ makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[I, A, NotUsed]
=
org.apache.pekko.stream.typed.scaladsl.ActorFlow
.ask[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))(
JavaDurationConverters.asFiniteDuration(timeout))
@@ -82,7 +82,7 @@ object ActorFlow {
def askWithStatus[I, Q, A](
ref: ActorRef[Q],
timeout: java.time.Duration,
- makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A,
NotUsed] =
+ makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]):
Flow[I, A, NotUsed] =
org.apache.pekko.stream.typed.scaladsl.ActorFlow
.askWithStatus[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i,
ref))(
JavaDurationConverters.asFiniteDuration(timeout))
@@ -139,7 +139,7 @@ object ActorFlow {
parallelism: Int,
ref: ActorRef[Q],
timeout: java.time.Duration,
- makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A,
NotUsed] =
+ makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]):
Flow[I, A, NotUsed] =
org.apache.pekko.stream.typed.scaladsl.ActorFlow
.askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i,
ref))(timeout.toMillis.millis)
.asJava
@@ -150,7 +150,7 @@ object ActorFlow {
def askWithContext[I, Q, A, Ctx](
ref: ActorRef[Q],
timeout: java.time.Duration,
- makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A,
Ctx], NotUsed] =
+ makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[Pair[I, Ctx],
Pair[A, Ctx], NotUsed] =
org.apache.pekko.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)
@@ -169,7 +169,7 @@ object ActorFlow {
def askWithStatusAndContext[I, Q, A, Ctx](
ref: ActorRef[Q],
timeout: java.time.Duration,
- makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[Pair[I,
Ctx], Pair[A, Ctx], NotUsed] =
+ makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]):
Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] =
org.apache.pekko.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)
@@ -187,7 +187,7 @@ object ActorFlow {
parallelism: Int,
ref: ActorRef[Q],
timeout: java.time.Duration,
- makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A,
Ctx], NotUsed] = {
+ makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[Pair[I, Ctx],
Pair[A, Ctx], NotUsed] = {
org.apache.pekko.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)
diff --git
a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
index 5e31b916e6..e7da04a0d0 100644
---
a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
+++
b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala
@@ -13,11 +13,10 @@
package org.apache.pekko.stream.typed.javadsl
-import java.util.function.Predicate
-
import org.apache.pekko
import pekko.actor.typed._
import pekko.japi.JavaPartialFunction
+import pekko.japi.function
import pekko.stream.{ CompletionStrategy, OverflowStrategy }
import pekko.stream.javadsl._
@@ -58,7 +57,7 @@ object ActorSource {
* @param overflowStrategy Strategy that is used when incoming elements
cannot fit inside the buffer
*/
def actorRef[T](
- completionMatcher: Predicate[T],
+ completionMatcher: function.Predicate[T],
failureMatcher: pekko.japi.function.Function[T,
java.util.Optional[Throwable]],
bufferSize: Int,
overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {
diff --git
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
index 3d671dfd72..11ab37d791 100644
---
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
+++
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes
@@ -18,3 +18,30 @@
# Migrate functions in japi.* to japi.function.*
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.JavaFlowSupport#Flow.fromProcessor")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.JavaFlowSupport#Flow.fromProcessorMat")
+
+# Migrate java.util.function.* to pekko.japi.function.*
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.RestartSettings.withRestartOn")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.groupedWeighted")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.delayWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recover")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recoverWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recoverWithRetries")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.aggregateWithBoundary")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.fromMaterializer")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.PartitionHub.of")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.PartitionHub.ofStateful")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Sink.fromMaterializer")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recover")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recoverWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recoverWithRetries")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.groupedWeighted")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.delayWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.aggregateWithBoundary")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.fromMaterializer")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubFlow.delayWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubFlow.aggregateWithBoundary")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubSource.delayWith")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubSource.aggregateWithBoundary")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.TLS.create")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Tcp.outgoingConnectionWithTls")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Tcp.bindWithTls")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala
b/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala
index e7fb56b61a..8fda40de43 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala
@@ -16,10 +16,10 @@ package org.apache.pekko.stream
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
+import pekko.japi.function
import pekko.event.Logging
import pekko.event.Logging.LogLevel
import pekko.util.ConstantFun
-import pekko.util.FunctionConverters._
import pekko.util.JavaDurationConverters._
final class RestartSettings private (
@@ -58,8 +58,8 @@ final class RestartSettings private (
copy(maxRestarts = count, maxRestartsWithin = within.asScala)
/** Decides whether the failure should restart the stream or make the
surrounding stream fail */
- def withRestartOn(restartOn: java.util.function.Predicate[Throwable]):
RestartSettings =
- copy(restartOn = restartOn.asScala)
+ def withRestartOn(restartOn: function.Predicate[Throwable]): RestartSettings
=
+ copy(restartOn = t => restartOn.test(t))
def withLogSettings(newLogSettings: RestartSettings.LogSettings):
RestartSettings =
copy(logSettings = newLogSettings)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 637bf91541..6e76d89579 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -16,8 +16,6 @@ package org.apache.pekko.stream.javadsl
import java.util.Comparator
import java.util.Optional
import java.util.concurrent.CompletionStage
-import java.util.function.BiFunction
-import java.util.function.Supplier
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
@@ -112,7 +110,7 @@ object Flow {
* [[Attributes]] of the [[Flow]] returned by this method.
*/
def fromMaterializer[I, O, M](
- factory: BiFunction[Materializer, Attributes, Flow[I, O, M]]): Flow[I,
O, CompletionStage[M]] =
+ factory: function.Function2[Materializer, Attributes, Flow[I, O, M]]):
Flow[I, O, CompletionStage[M]] =
scaladsl.Flow.fromMaterializer((mat, attr) => factory(mat,
attr).asScala).mapMaterializedValue(_.asJava).asJava
/**
@@ -1273,7 +1271,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long,
- costFn: java.util.function.Function[Out, java.lang.Long]):
javadsl.Flow[In, java.util.List[Out], Mat] =
+ costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In,
java.util.List[Out], Mat] =
new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
// TODO optimize to one step
/**
@@ -1316,7 +1314,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
- costFn: java.util.function.Function[Out, java.lang.Long])
+ costFn: function.Function[Out, java.lang.Long])
: javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
new Flow(delegate.groupedAdjacentByWeighted(f.apply,
maxWeight)(costFn.apply).map(_.asJava))
@@ -1740,13 +1738,13 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*
- * @param delayStrategySupplier creates new [[DelayStrategy]] object for
each materialization
+ * @param delayStrategyCreator creates new [[DelayStrategy]] object for each
materialization
* @param overFlowStrategy Strategy that is used when incoming elements
cannot fit inside the buffer
*/
def delayWith(
- delayStrategySupplier: Supplier[DelayStrategy[Out]],
+ delayStrategyCreator: function.Creator[DelayStrategy[Out]],
overFlowStrategy: DelayOverflowStrategy): Flow[In, Out, Mat] =
- new Flow(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+ new Flow(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
* Discard the given number of elements at the beginning of the stream.
@@ -1892,9 +1890,9 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
- def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]):
javadsl.Flow[In, Out, Mat] =
+ def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]):
javadsl.Flow[In, Out, Mat] =
recover {
- case elem if clazz.isInstance(elem) => supplier.get()
+ case elem if clazz.isInstance(elem) => creator.create()
}
/**
@@ -1984,9 +1982,9 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def recoverWith(
clazz: Class[_ <: Throwable],
- supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In,
Out, Mat] =
+ creator: function.Creator[Graph[SourceShape[Out], NotUsed]]):
javadsl.Flow[In, Out, Mat] =
recoverWith({
- case elem if clazz.isInstance(elem) => supplier.get()
+ case elem if clazz.isInstance(elem) => creator.create()
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
@@ -2043,15 +2041,15 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*
* @param attempts Maximum number of retries or -1 to retry indefinitely
* @param clazz the class object of the failure cause
- * @param supplier supply the new Source to be materialized
+ * @param creator supply the new Source to be materialized
*/
def recoverWithRetries(
attempts: Int,
clazz: Class[_ <: Throwable],
- supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In,
Out, Mat] =
+ creator: function.Creator[Graph[SourceShape[Out], NotUsed]]):
javadsl.Flow[In, Out, Mat] =
recoverWithRetries(attempts,
{
- case elem if clazz.isInstance(elem) => supplier.get()
+ case elem if clazz.isInstance(elem) => creator.create()
})
/**
@@ -2104,7 +2102,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
- def onErrorComplete(predicate: java.util.function.Predicate[_ >:
Throwable]): javadsl.Flow[In, Out, Mat] =
+ def onErrorComplete(predicate: function.Predicate[_ >: Throwable]):
javadsl.Flow[In, Out, Mat] =
new Flow(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})
@@ -4308,13 +4306,13 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
* @param emitOnTimer decide whether the current aggregated elements can be
emitted, the custom function is invoked on every interval
*/
@deprecated("Use the overloaded one which accepts an Optional instead.",
since = "1.2.0")
- def aggregateWithBoundary[Agg, Emit](allocate:
java.util.function.Supplier[Agg],
+ def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
- emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
+ emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
: javadsl.Flow[In, Emit, Mat] = {
asScala
- .aggregateWithBoundary(() => allocate.get())(
+ .aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
@@ -4341,14 +4339,14 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
* @param harvest this is invoked before emit within the current
stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be
emitted, the custom function is invoked on every interval
*/
- def aggregateWithBoundary[Agg, Emit](allocate:
java.util.function.Supplier[Agg],
+ def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
- emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg],
java.time.Duration]])
+ emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
: javadsl.Flow[In, Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
asScala
- .aggregateWithBoundary(() => allocate.get())(
+ .aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala
index be492a1dc9..3a3f34ce56 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala
@@ -13,9 +13,10 @@
package org.apache.pekko.stream.javadsl
-import java.util.function.{ BiFunction, Supplier, ToLongBiFunction }
+import java.util.function.ToLongBiFunction
import org.apache.pekko
+import pekko.japi.function
import pekko.NotUsed
import pekko.annotation.DoNotInherit
import pekko.util.unused
@@ -259,11 +260,11 @@ object PartitionHub {
*/
def ofStateful[T](
@unused clazz: Class[T],
- partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]],
+ partitioner: function.Creator[ToLongBiFunction[ConsumerInfo, T]],
startAfterNrOfConsumers: Int,
bufferSize: Int): Sink[T, Source[T, NotUsed]] = {
val p: () => (pekko.stream.scaladsl.PartitionHub.ConsumerInfo, T) => Long
= () => {
- val f = partitioner.get()
+ val f = partitioner.create()
(info, elem) => f.applyAsLong(info, elem)
}
pekko.stream.scaladsl.PartitionHub
@@ -303,7 +304,7 @@ object PartitionHub {
*/
def ofStateful[T](
clazz: Class[T],
- partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]],
+ partitioner: function.Creator[ToLongBiFunction[ConsumerInfo, T]],
startAfterNrOfConsumers: Int): Sink[T, Source[T, NotUsed]] =
ofStateful(clazz, partitioner, startAfterNrOfConsumers,
pekko.stream.scaladsl.PartitionHub.defaultBufferSize)
@@ -338,7 +339,7 @@ object PartitionHub {
*/
def of[T](
@unused clazz: Class[T],
- partitioner: BiFunction[Integer, T, Integer],
+ partitioner: function.Function2[Integer, T, Integer],
startAfterNrOfConsumers: Int,
bufferSize: Int): Sink[T, Source[T, NotUsed]] =
pekko.stream.scaladsl.PartitionHub
@@ -377,7 +378,7 @@ object PartitionHub {
*/
def of[T](
clazz: Class[T],
- partitioner: BiFunction[Integer, T, Integer],
+ partitioner: function.Function2[Integer, T, Integer],
startAfterNrOfConsumers: Int): Sink[T, Source[T, NotUsed]] =
of(clazz, partitioner, startAfterNrOfConsumers,
pekko.stream.scaladsl.PartitionHub.defaultBufferSize)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index e5f4bf7fcd..76db0ef98c 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
import java.util.Optional
import java.util.concurrent.{ CompletableFuture, CompletionStage }
-import java.util.function.BiFunction
import java.util.stream.Collector
import scala.annotation.unchecked.uncheckedVariance
@@ -391,7 +390,8 @@ object Sink {
* exposes [[Materializer]] which is going to be used during materialization
and
* [[Attributes]] of the [[Sink]] returned by this method.
*/
- def fromMaterializer[T, M](factory: BiFunction[Materializer, Attributes,
Sink[T, M]]): Sink[T, CompletionStage[M]] =
+ def fromMaterializer[T, M](
+ factory: function.Function2[Materializer, Attributes, Sink[T, M]]):
Sink[T, CompletionStage[M]] =
scaladsl.Sink.fromMaterializer((mat, attr) => factory(mat,
attr).asScala).mapMaterializedValue(_.asJava).asJava
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 3649a7ef44..96e5f852c7 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -16,7 +16,6 @@ package org.apache.pekko.stream.javadsl
import java.util
import java.util.Optional
import java.util.concurrent.{ CompletableFuture, CompletionStage }
-import java.util.function.{ BiFunction, Supplier }
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
@@ -507,7 +506,7 @@ object Source {
* [[Attributes]] of the [[Source]] returned by this method.
*/
def fromMaterializer[T, M](
- factory: BiFunction[Materializer, Attributes, Source[T, M]]): Source[T,
CompletionStage[M]] =
+ factory: function.Function2[Materializer, Attributes, Source[T, M]]):
Source[T, CompletionStage[M]] =
scaladsl.Source.fromMaterializer((mat, attr) => factory(mat,
attr).asScala).mapMaterializedValue(_.asJava).asJava
/**
@@ -2129,9 +2128,9 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
- def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]):
javadsl.Source[Out, Mat] =
+ def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]):
javadsl.Source[Out, Mat] =
recover {
- case elem if clazz.isInstance(elem) => supplier.get()
+ case elem if clazz.isInstance(elem) => creator.create()
}
/**
@@ -2221,9 +2220,9 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*/
def recoverWith(
clazz: Class[_ <: Throwable],
- supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
+ creator: function.Creator[Graph[SourceShape[Out], NotUsed]]):
Source[Out, Mat] =
recoverWith({
- case elem if clazz.isInstance(elem) => supplier.get()
+ case elem if clazz.isInstance(elem) => creator.create()
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
@@ -2277,15 +2276,15 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* @param attempts Maximum number of retries or -1 to retry indefinitely
* @param clazz the class object of the failure cause
- * @param supplier supply the new Source to be materialized
+ * @param creator create the new Source to be materialized
*/
def recoverWithRetries(
attempts: Int,
clazz: Class[_ <: Throwable],
- supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
+ creator: function.Creator[Graph[SourceShape[Out], NotUsed]]):
Source[Out, Mat] =
recoverWithRetries(attempts,
{
- case elem if clazz.isInstance(elem) => supplier.get()
+ case elem if clazz.isInstance(elem) => creator.create()
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
@@ -2338,7 +2337,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
- def onErrorComplete(predicate: java.util.function.Predicate[_ >:
Throwable]): javadsl.Source[Out, Mat] =
+ def onErrorComplete(predicate: function.Predicate[_ >: Throwable]):
javadsl.Source[Out, Mat] =
new Source(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})
@@ -2988,7 +2987,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
- def groupedWeighted(minWeight: Long, costFn:
java.util.function.Function[Out, java.lang.Long])
+ def groupedWeighted(minWeight: Long, costFn: function.Function[Out,
java.lang.Long])
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
@@ -3031,7 +3030,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
- costFn: java.util.function.Function[Out, java.lang.Long])
+ costFn: function.Function[Out, java.lang.Long])
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedAdjacentByWeighted(f.apply,
maxWeight)(costFn.apply).map(_.asJava))
@@ -3444,13 +3443,13 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*
- * @param delayStrategySupplier creates new [[DelayStrategy]] object for
each materialization
+ * @param delayStrategyCreator creates new [[DelayStrategy]] object for each
materialization
* @param overFlowStrategy Strategy that is used when incoming elements
cannot fit inside the buffer
*/
def delayWith(
- delayStrategySupplier: Supplier[DelayStrategy[Out]],
+ delayStrategyCreator: function.Creator[DelayStrategy[Out]],
overFlowStrategy: DelayOverflowStrategy): Source[Out, Mat] =
- new Source(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+ new Source(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
* Discard the given number of elements at the beginning of the stream.
@@ -4749,12 +4748,12 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
* @param emitOnTimer decide whether the current aggregated elements can be
emitted, the custom function is invoked on every interval
*/
@deprecated("Use the overloaded one which accepts an Optional instead.",
since = "1.2.0")
- def aggregateWithBoundary[Agg, Emit](allocate:
java.util.function.Supplier[Agg],
+ def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
- emitOnTimer: Pair[java.util.function.Predicate[Agg],
java.time.Duration]): javadsl.Source[Emit, Mat] = {
+ emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration]):
javadsl.Source[Emit, Mat] = {
asScala
- .aggregateWithBoundary(() => allocate.get())(
+ .aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
@@ -4781,13 +4780,13 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
* @param harvest this is invoked before emit within the current
stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be
emitted, the custom function is invoked on every interval
*/
- def aggregateWithBoundary[Agg, Emit](allocate:
java.util.function.Supplier[Agg],
+ def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
- emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg],
java.time.Duration]]): javadsl.Source[Emit, Mat] = {
+ emitOnTimer: Optional[Pair[function.Predicate[Agg],
java.time.Duration]]): javadsl.Source[Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
asScala
- .aggregateWithBoundary(() => allocate.get())(
+ .aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index e4417dc676..947432a6f1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
-import java.util.function.Supplier
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
@@ -717,7 +716,7 @@ final class SubFlow[In, Out, Mat](
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
- costFn: java.util.function.Function[Out, java.lang.Long])
+ costFn: function.Function[Out, java.lang.Long])
: SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedAdjacentByWeighted(f.apply,
maxWeight)(costFn.apply).map(_.asJava))
@@ -1134,13 +1133,13 @@ final class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*
- * @param delayStrategySupplier creates new [[DelayStrategy]] object for
each materialization
+ * @param delayStrategyCreator creates new [[DelayStrategy]] object for each
materialization
* @param overFlowStrategy Strategy that is used when incoming elements
cannot fit inside the buffer
*/
def delayWith(
- delayStrategySupplier: Supplier[DelayStrategy[Out]],
+ delayStrategyCreator: function.Creator[DelayStrategy[Out]],
overFlowStrategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] =
- new SubFlow(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+ new SubFlow(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
* Discard the given number of elements at the beginning of the stream.
@@ -1367,7 +1366,7 @@ final class SubFlow[In, Out, Mat](
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
- def onErrorComplete(predicate: java.util.function.Predicate[_ >:
Throwable]): SubFlow[In, Out, Mat] =
+ def onErrorComplete(predicate: function.Predicate[_ >: Throwable]):
SubFlow[In, Out, Mat] =
new SubFlow(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})
@@ -2790,13 +2789,13 @@ final class SubFlow[In, Out, Mat](
* @param emitOnTimer decide whether the current aggregated elements can be
emitted, the custom function is invoked on every interval
*/
@deprecated("Use the overloaded one which accepts an Optional instead.",
since = "1.2.0")
- def aggregateWithBoundary[Agg, Emit](allocate:
java.util.function.Supplier[Agg],
+ def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
- emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
+ emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
: javadsl.SubFlow[In, Emit, Mat] = {
new SubFlow(
- asScala.aggregateWithBoundary(() => allocate.get())(
+ asScala.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
@@ -2822,14 +2821,14 @@ final class SubFlow[In, Out, Mat](
* @param harvest this is invoked before emit within the current
stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be
emitted, the custom function is invoked on every interval
*/
- def aggregateWithBoundary[Agg, Emit](allocate:
java.util.function.Supplier[Agg],
+ def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
- emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg],
java.time.Duration]])
+ emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
: javadsl.SubFlow[In, Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
new SubFlow(
- asScala.aggregateWithBoundary(() => allocate.get())(
+ asScala.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index ed6b8a19dd..387c4f1b19 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
-import java.util.function.Supplier
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
@@ -707,7 +706,7 @@ final class SubSource[Out, Mat](
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
- costFn: java.util.function.Function[Out, java.lang.Long])
+ costFn: function.Function[Out, java.lang.Long])
: SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedAdjacentByWeighted(f.apply,
maxWeight)(costFn.apply).map(_.asJava))
@@ -1226,13 +1225,13 @@ final class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*
- * @param delayStrategySupplier creates new [[DelayStrategy]] object for
each materialization
+ * @param delayStrategyCreator creates new [[DelayStrategy]] object for each
materialization
* @param overFlowStrategy Strategy that is used when incoming elements
cannot fit inside the buffer
*/
def delayWith(
- delayStrategySupplier: Supplier[DelayStrategy[Out]],
+ delayStrategyCreator: function.Creator[DelayStrategy[Out]],
overFlowStrategy: DelayOverflowStrategy): SubSource[Out, Mat] =
- new SubSource(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+ new SubSource(delegate.delayWith(() =>
DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
* Recover allows to send last element on failure and gracefully complete
the stream
@@ -1344,7 +1343,7 @@ final class SubSource[Out, Mat](
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
- def onErrorComplete(predicate: java.util.function.Predicate[_ >:
Throwable]): SubSource[Out, Mat] =
+ def onErrorComplete(predicate: function.Predicate[_ >: Throwable]):
SubSource[Out, Mat] =
new SubSource(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})
@@ -2764,13 +2763,13 @@ final class SubSource[Out, Mat](
* @param emitOnTimer decide whether the current aggregated elements can be
emitted, the custom function is invoked on every interval
*/
@deprecated("Use the overloaded one which accepts an Optional instead.",
since = "1.2.0")
- def aggregateWithBoundary[Agg, Emit](allocate:
java.util.function.Supplier[Agg],
+ def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
- emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
+ emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
: javadsl.SubSource[Emit, Mat] = {
new SubSource(
- asScala.aggregateWithBoundary(() => allocate.get())(
+ asScala.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
@@ -2796,14 +2795,14 @@ final class SubSource[Out, Mat](
* @param harvest this is invoked before emit within the current
stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be
emitted, the custom function is invoked on every interval
*/
- def aggregateWithBoundary[Agg, Emit](allocate:
java.util.function.Supplier[Agg],
+ def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
- emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg],
java.time.Duration]])
+ emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
: javadsl.SubSource[Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
new SubSource(
- asScala.aggregateWithBoundary(() => allocate.get())(
+ asScala.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala
index 0955fba7ac..30747abdbe 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala
@@ -13,13 +13,13 @@
package org.apache.pekko.stream.javadsl
-import java.util.function.{ Consumer, Supplier }
import javax.net.ssl.{ SSLEngine, SSLSession }
import scala.util.Try
import org.apache.pekko
import pekko.NotUsed
+import pekko.japi.function
import pekko.stream._
import pekko.stream.TLSProtocol._
import pekko.util.ByteString
@@ -73,11 +73,11 @@ object TLS {
* For a description of the `closing` parameter please refer to
[[TLSClosing]].
*/
def create(
- sslEngineCreator: Supplier[SSLEngine],
- sessionVerifier: Consumer[SSLSession],
+ sslEngineCreator: function.Creator[SSLEngine],
+ sessionVerifier: function.Procedure[SSLSession],
closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString,
SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(
- scaladsl.TLS.apply(() => sslEngineCreator.get(), session =>
Try(sessionVerifier.accept(session)), closing))
+ scaladsl.TLS.apply(() => sslEngineCreator.create(), session =>
Try(sessionVerifier(session)), closing))
/**
* Create a StreamTls [[pekko.stream.javadsl.BidiFlow]]. This is a low-level
interface.
@@ -88,9 +88,9 @@ object TLS {
* For a description of the `closing` parameter please refer to
[[TLSClosing]].
*/
def create(
- sslEngineCreator: Supplier[SSLEngine],
+ sslEngineCreator: function.Creator[SSLEngine],
closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString,
SslTlsInbound, NotUsed] =
- new javadsl.BidiFlow(scaladsl.TLS.apply(() => sslEngineCreator.get(),
closing))
+ new javadsl.BidiFlow(scaladsl.TLS.apply(() => sslEngineCreator.create(),
closing))
}
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
index d34b1b51a4..7255a02e6a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
@@ -17,8 +17,6 @@ import java.lang.{ Iterable => JIterable }
import java.net.InetSocketAddress
import java.util.Optional
import java.util.concurrent.CompletionStage
-import java.util.function.{ Function => JFunction }
-import java.util.function.Supplier
import javax.net.ssl.SSLEngine
import javax.net.ssl.SSLSession
@@ -35,6 +33,7 @@ import pekko.actor.ExtensionId
import pekko.actor.ExtensionIdProvider
import pekko.annotation.InternalApi
import pekko.io.Inet.SocketOption
+import pekko.japi.function
import pekko.stream.Materializer
import pekko.stream.SystemMaterializer
import pekko.stream.TLSClosing
@@ -261,10 +260,10 @@ class Tcp(system: ExtendedActorSystem) extends
pekko.actor.Extension {
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
- createSSLEngine: Supplier[SSLEngine]): Flow[ByteString, ByteString,
CompletionStage[OutgoingConnection]] =
+ createSSLEngine: function.Creator[SSLEngine]): Flow[ByteString,
ByteString, CompletionStage[OutgoingConnection]] =
Flow.fromGraph(
delegate
- .outgoingConnectionWithTls(remoteAddress, createSSLEngine = () =>
createSSLEngine.get())
+ .outgoingConnectionWithTls(remoteAddress, createSSLEngine = () =>
createSSLEngine.create())
.mapMaterializedValue(_.map(new
OutgoingConnection(_))(parasitic).asJava))
/**
@@ -279,18 +278,18 @@ class Tcp(system: ExtendedActorSystem) extends
pekko.actor.Extension {
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
- createSSLEngine: Supplier[SSLEngine],
+ createSSLEngine: function.Creator[SSLEngine],
localAddress: Optional[InetSocketAddress],
options: JIterable[SocketOption],
connectTimeout: Optional[java.time.Duration],
idleTimeout: Optional[java.time.Duration],
- verifySession: JFunction[SSLSession, Optional[Throwable]],
+ verifySession: function.Function[SSLSession, Optional[Throwable]],
closing: TLSClosing): Flow[ByteString, ByteString,
CompletionStage[OutgoingConnection]] = {
Flow.fromGraph(
delegate
.outgoingConnectionWithTls(
remoteAddress,
- createSSLEngine = () => createSSLEngine.get(),
+ createSSLEngine = () => createSSLEngine.create(),
localAddress.toScala,
CollectionUtil.toSeq(options),
optionalDurationToScala(connectTimeout),
@@ -313,10 +312,10 @@ class Tcp(system: ExtendedActorSystem) extends
pekko.actor.Extension {
def bindWithTls(
interface: String,
port: Int,
- createSSLEngine: Supplier[SSLEngine]): Source[IncomingConnection,
CompletionStage[ServerBinding]] = {
+ createSSLEngine: function.Creator[SSLEngine]):
Source[IncomingConnection, CompletionStage[ServerBinding]] = {
Source.fromGraph(
delegate
- .bindWithTls(interface, port, createSSLEngine = () =>
createSSLEngine.get())
+ .bindWithTls(interface, port, createSSLEngine = () =>
createSSLEngine.create())
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava))
}
@@ -330,18 +329,18 @@ class Tcp(system: ExtendedActorSystem) extends
pekko.actor.Extension {
def bindWithTls(
interface: String,
port: Int,
- createSSLEngine: Supplier[SSLEngine],
+ createSSLEngine: function.Creator[SSLEngine],
backlog: Int,
options: JIterable[SocketOption],
idleTimeout: Optional[java.time.Duration],
- verifySession: JFunction[SSLSession, Optional[Throwable]],
+ verifySession: function.Function[SSLSession, Optional[Throwable]],
closing: TLSClosing): Source[IncomingConnection,
CompletionStage[ServerBinding]] = {
Source.fromGraph(
delegate
.bindWithTls(
interface,
port,
- createSSLEngine = () => createSSLEngine.get(),
+ createSSLEngine = () => createSSLEngine.create(),
backlog,
CollectionUtil.toSeq(options),
optionalDurationToScala(idleTimeout),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]