This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new f9ad446a58 Fix uncurried stream ops in javadsl
f9ad446a58 is described below

commit f9ad446a58dae77ccdec52719c5235e02ce8c770
Author: Matthew de Detrich <[email protected]>
AuthorDate: Sun Jul 21 08:53:13 2024 +0200

    Fix uncurried stream ops in javadsl
---
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 24 ++++++++++-----------
 .../org/apache/pekko/stream/javadsl/Graph.scala    |  2 +-
 .../org/apache/pekko/stream/javadsl/Sink.scala     |  4 ++--
 .../org/apache/pekko/stream/javadsl/Source.scala   | 25 +++++++++++-----------
 .../pekko/stream/javadsl/StreamConverters.scala    |  2 +-
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 14 ++++++------
 .../apache/pekko/stream/javadsl/SubSource.scala    | 14 ++++++------
 7 files changed, 42 insertions(+), 43 deletions(-)

diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 356c7e44c5..1963a97f58 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1254,7 +1254,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
-  def groupedWeighted(minWeight: Long)(
+  def groupedWeighted(minWeight: Long,
       costFn: java.util.function.Function[Out, java.lang.Long]): 
javadsl.Flow[In, java.util.List[Out], Mat] =
     new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) 
// TODO optimize to one step
 
@@ -1311,7 +1311,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
    */
-  def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): 
javadsl.Flow[In, Out, Mat] = {
+  def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): 
javadsl.Flow[In, Out, Mat] = {
     new Flow(delegate.limitWeighted(n)(costFn.apply))
   }
 
@@ -1355,7 +1355,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
-  def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, 
Mat] =
+  def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, 
Mat] =
     new Flow(delegate.scan(zero)(f.apply))
 
   /**
@@ -1386,7 +1386,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * See also [[#scan]]
    */
-  def scanAsync[T](zero: T)(f: function.Function2[T, Out, 
CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
+  def scanAsync[T](zero: T, f: function.Function2[T, Out, 
CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
     new Flow(delegate.scanAsync(zero) { (out, in) =>
       f(out, in).asScala
     })
@@ -1412,7 +1412,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
-  def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, 
Mat] =
+  def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, 
Mat] =
     new Flow(delegate.fold(zero)(f.apply))
 
   /**
@@ -1464,7 +1464,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
-  def foldAsync[T](zero: T)(f: function.Function2[T, Out, 
CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
+  def foldAsync[T](zero: T, f: function.Function2[T, Out, 
CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
     new Flow(delegate.foldAsync(zero) { (out, in) =>
       f(out, in).asScala
     })
@@ -2636,7 +2636,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
   @deprecated(
     "Use 
.withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather 
than a SubstreamCancelStrategy",
     since = "1.1.0")
-  def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: 
function.Predicate[Out]): SubFlow[In, Out, Mat] =
+  def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: 
function.Predicate[Out]): SubFlow[In, Out, Mat] =
     new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test))
 
   /**
@@ -2697,7 +2697,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
   @deprecated(
     "Use 
.withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather 
than a SubstreamCancelStrategy",
     since = "1.1.0")
-  def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: 
function.Predicate[Out]): SubFlow[In, Out, Mat] =
+  def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: 
function.Predicate[Out]): SubFlow[In, Out, Mat] =
     new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test))
 
   /**
@@ -3497,7 +3497,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
-  def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], 
thisElem: A, thatElem: U)(
+  def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], 
thisElem: A, thatElem: U,
       matF: (Mat, Mat2) => Mat3): Flow[In, Pair[A, U], Mat3] =
     new Flow(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, 
u) => Pair.create(a, u) })
 
@@ -4169,7 +4169,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * from downstream. It fails with the same error when received error message 
from
    * downstream.
    */
-  def watchTermination[M]()(matF: function.Function2[Mat, 
CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
+  def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], 
M]): javadsl.Flow[In, Out, M] =
     new Flow(delegate.watchTermination()((left, right) => matF(left, 
right.asJava)))
 
   /**
@@ -4180,7 +4180,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * The `combine` function is used to combine the `FlowMonitor` with this 
flow's materialized value.
    */
   @deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
-  def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): 
javadsl.Flow[In, Out, M] =
+  def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): 
javadsl.Flow[In, Out, M] =
     new Flow(delegate.monitorMat(combinerToScala(combine)))
 
   /**
@@ -4504,7 +4504,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
   @ApiMayChange
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg])(
+  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
       emitOnTimer: Pair[java.util.function.Predicate[Agg], 
java.time.Duration]): javadsl.Flow[In, Emit, Mat] =
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala
index 6cec4cc7fb..b486112c41 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala
@@ -742,7 +742,7 @@ object GraphDSL extends GraphCreate {
     new GenericGraph(s, gbuilder.delegate.result(s))
   }
 
-  final class Builder[+Mat]()(private[stream] implicit val delegate: 
scaladsl.GraphDSL.Builder[Mat]) { self =>
+  final class Builder[+Mat](private[stream] implicit val delegate: 
scaladsl.GraphDSL.Builder[Mat]) { self =>
     import pekko.stream.scaladsl.GraphDSL.Implicits._
 
     /**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index b3b8a58527..4bd056d358 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -204,7 +204,7 @@ object Sink {
    * normal end of the stream, or completed with `Failure` if there is a 
failure signaled in
    * the stream.
    */
-  def foreachAsync[T](parallelism: Int)(
+  def foreachAsync[T](parallelism: Int,
       f: function.Function[T, CompletionStage[Void]]): Sink[T, 
CompletionStage[Done]] =
     new Sink(
       scaladsl.Sink
@@ -225,7 +225,7 @@ object Sink {
   @deprecated(
     "Use `foreachAsync` instead, it allows you to choose how to run the 
procedure, by calling some other API returning a CompletionStage or using 
CompletableFuture.supplyAsync.",
     since = "Akka 2.5.17")
-  def foreachParallel[T](parallel: Int)(f: function.Procedure[T])(
+  def foreachParallel[T](parallel: Int, f: function.Procedure[T],
       ec: ExecutionContext): Sink[T, CompletionStage[Done]] =
     new 
Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage())
 
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index d3fbdfdfb6..bf384639d4 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -1987,7 +1987,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], 
thisElem: A, thatElem: U)(
+  def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], 
thisElem: A, thatElem: U,
       matF: (Mat, Mat2) => Mat3): Source[Pair[A, U], Mat3] =
     new Source(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case 
(a, u) => Pair.create(a, u) })
 
@@ -3002,7 +3002,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def groupedWeighted(minWeight: Long)(costFn: 
java.util.function.Function[Out, java.lang.Long])
+  def groupedWeighted(minWeight: Long, costFn: 
java.util.function.Function[Out, java.lang.Long])
       : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
     new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
 
@@ -3055,9 +3055,8 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
    */
-  def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): 
javadsl.Source[Out, Mat] = {
+  def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): 
javadsl.Source[Out, Mat] =
     new Source(delegate.limitWeighted(n)(costFn.apply))
-  }
 
   /**
    * Apply a sliding window over the stream and return the windows as groups 
of elements, with the last group
@@ -3099,7 +3098,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, 
Mat] =
+  def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, 
Mat] =
     new Source(delegate.scan(zero)(f.apply))
 
   /**
@@ -3130,7 +3129,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * See also [[FlowOps#scan]]
    */
-  def scanAsync[T](zero: T)(f: function.Function2[T, Out, 
CompletionStage[T]]): javadsl.Source[T, Mat] =
+  def scanAsync[T](zero: T, f: function.Function2[T, Out, 
CompletionStage[T]]): javadsl.Source[T, Mat] =
     new Source(delegate.scanAsync(zero) { (out, in) =>
       f(out, in).asScala
     })
@@ -3156,7 +3155,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, 
Mat] =
+  def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, 
Mat] =
     new Source(delegate.fold(zero)(f.apply))
 
   /**
@@ -3206,7 +3205,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def foldAsync[T](zero: T)(f: function.Function2[T, Out, 
CompletionStage[T]]): javadsl.Source[T, Mat] =
+  def foldAsync[T](zero: T, f: function.Function2[T, Out, 
CompletionStage[T]]): javadsl.Source[T, Mat] =
     new Source(delegate.foldAsync(zero) { (out, in) =>
       f(out, in).asScala
     })
@@ -4142,7 +4141,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
   @deprecated(
     "Use 
.withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather 
than a SubstreamCancelStrategy",
     since = "1.1.0")
-  def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: 
function.Predicate[Out]): SubSource[Out, Mat] =
+  def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: 
function.Predicate[Out]): SubSource[Out, Mat] =
     new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test))
 
   /**
@@ -4202,7 +4201,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
   @deprecated(
     "Use 
.withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather 
than a SubstreamCancelStrategy",
     since = "1.1.0")
-  def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: 
function.Predicate[Out]): SubSource[Out, Mat] =
+  def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: 
function.Predicate[Out]): SubSource[Out, Mat] =
     new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test))
 
   /**
@@ -4738,7 +4737,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * from downstream. It fails with the same error when received error message 
from
    * downstream.
    */
-  def watchTermination[M]()(matF: function.Function2[Mat, 
CompletionStage[Done], M]): javadsl.Source[Out, M] =
+  def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], 
M]): javadsl.Source[Out, M] =
     new Source(delegate.watchTermination()((left, right) => matF(left, 
right.asJava)))
 
   /**
@@ -4748,7 +4747,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * The `combine` function is used to combine the `FlowMonitor` with this 
flow's materialized value.
    */
   @deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
-  def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): 
javadsl.Source[Out, M] =
+  def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): 
javadsl.Source[Out, M] =
     new Source(delegate.monitorMat(combinerToScala(combine)))
 
   /**
@@ -5052,7 +5051,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
   @ApiMayChange
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg])(
+  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
       emitOnTimer: Pair[java.util.function.Predicate[Agg], 
java.time.Duration]): javadsl.Source[Emit, Mat] =
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala
index c400ca844d..c3638d5b59 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala
@@ -267,7 +267,7 @@ object StreamConverters {
    * Note that a flow can be materialized multiple times, so the function 
producing the ``Collector`` must be able
    * to handle multiple invocations.
    */
-  def javaCollectorParallelUnordered[T, R](parallelism: Int)(
+  def javaCollectorParallelUnordered[T, R](parallelism: Int,
       collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, 
CompletionStage[R]] =
     new Sink(
       scaladsl.StreamConverters
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 5e3eea36e5..addcc8b07b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -638,7 +638,7 @@ class SubFlow[In, Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def groupedWeighted(minWeight: Long)(
+  def groupedWeighted(minWeight: Long,
       costFn: function.Function[Out, java.lang.Long]): SubFlow[In, 
java.util.List[Out @uncheckedVariance], Mat] =
     new 
SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // 
TODO optimize to one step
 
@@ -691,7 +691,7 @@ class SubFlow[In, Out, Mat](
    *
    * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
    */
-  def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): 
javadsl.SubFlow[In, Out, Mat] = {
+  def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): 
javadsl.SubFlow[In, Out, Mat] = {
     new SubFlow(delegate.limitWeighted(n)(costFn.apply))
   }
 
@@ -735,7 +735,7 @@ class SubFlow[In, Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
+  def scan[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
     new SubFlow(delegate.scan(zero)(f.apply))
 
   /**
@@ -766,7 +766,7 @@ class SubFlow[In, Out, Mat](
    *
    * See also [[#scan]]
    */
-  def scanAsync[T](zero: T)(f: function.Function2[T, Out, 
CompletionStage[T]]): SubFlow[In, T, Mat] =
+  def scanAsync[T](zero: T, f: function.Function2[T, Out, 
CompletionStage[T]]): SubFlow[In, T, Mat] =
     new SubFlow(delegate.scanAsync(zero) { (out, in) =>
       f(out, in).asScala
     })
@@ -792,7 +792,7 @@ class SubFlow[In, Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
+  def fold[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
     new SubFlow(delegate.fold(zero)(f.apply))
 
   /**
@@ -844,7 +844,7 @@ class SubFlow[In, Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def foldAsync[T](zero: T)(f: function.Function2[T, Out, 
CompletionStage[T]]): SubFlow[In, T, Mat] =
+  def foldAsync[T](zero: T, f: function.Function2[T, Out, 
CompletionStage[T]]): SubFlow[In, T, Mat] =
     new SubFlow(delegate.foldAsync(zero) { (out, in) =>
       f(out, in).asScala
     })
@@ -3031,7 +3031,7 @@ class SubFlow[In, Out, Mat](
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
   @ApiMayChange
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg])(
+  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
       emitOnTimer: Pair[java.util.function.Predicate[Agg], 
java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] =
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index ee63fbfb93..ad68e3579e 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -629,7 +629,7 @@ class SubSource[Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def groupedWeighted(minWeight: Long)(
+  def groupedWeighted(minWeight: Long,
       costFn: function.Function[Out, java.lang.Long]): 
SubSource[java.util.List[Out @uncheckedVariance], Mat] =
     new 
SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // 
TODO optimize to one step
 
@@ -697,7 +697,7 @@ class SubSource[Out, Mat](
    *
    * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
    */
-  def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): 
javadsl.SubSource[Out, Mat] = {
+  def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): 
javadsl.SubSource[Out, Mat] = {
     new SubSource(delegate.limitWeighted(n)(costFn.apply))
   }
 
@@ -726,7 +726,7 @@ class SubSource[Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
+  def scan[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] =
     new SubSource(delegate.scan(zero)(f.apply))
 
   /**
@@ -757,7 +757,7 @@ class SubSource[Out, Mat](
    *
    * See also [[#scan]]
    */
-  def scanAsync[T](zero: T)(f: function.Function2[T, Out, 
CompletionStage[T]]): SubSource[T, Mat] =
+  def scanAsync[T](zero: T, f: function.Function2[T, Out, 
CompletionStage[T]]): SubSource[T, Mat] =
     new SubSource(delegate.scanAsync(zero) { (out, in) =>
       f(out, in).asScala
     })
@@ -783,7 +783,7 @@ class SubSource[Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
+  def fold[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] =
     new SubSource(delegate.fold(zero)(f.apply))
 
   /**
@@ -831,7 +831,7 @@ class SubSource[Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def foldAsync[T](zero: T)(f: function.Function2[T, Out, 
CompletionStage[T]]): SubSource[T, Mat] =
+  def foldAsync[T](zero: T, f: function.Function2[T, Out, 
CompletionStage[T]]): SubSource[T, Mat] =
     new SubSource(delegate.foldAsync(zero) { (out, in) =>
       f(out, in).asScala
     })
@@ -3002,7 +3002,7 @@ class SubSource[Out, Mat](
    * @param emitOnTimer decide whether the current aggregated elements can be 
emitted, the custom function is invoked on every interval
    */
   @ApiMayChange
-  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg])(
+  def aggregateWithBoundary[Agg, Emit](allocate: 
java.util.function.Supplier[Agg],
       aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
       harvest: function.Function[Agg, Emit],
       emitOnTimer: Pair[java.util.function.Predicate[Agg], 
java.time.Duration]): javadsl.SubSource[Emit, Mat] =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to