This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b1f57224b =str Tweak the stream mapAsyncPartitioned operator
1b1f57224b is described below

commit 1b1f57224b409c8b2cbd5267ace814439e97d3ea
Author: He-Pin <[email protected]>
AuthorDate: Sat Sep 23 14:51:31 2023 +0800

    =str Tweak the stream mapAsyncPartitioned operator
---
 .../pekko/stream/MapAsyncPartitionedSpec.scala     |  27 ++--
 .../apache/pekko/stream/MapAsyncPartitioned.scala  | 153 ++++++---------------
 .../org/apache/pekko/stream/impl/Stages.scala      |   2 +
 .../org/apache/pekko/stream/javadsl/Flow.scala     |  54 ++++++--
 .../pekko/stream/javadsl/FlowWithContext.scala     |  33 ++---
 .../org/apache/pekko/stream/javadsl/Source.scala   |  56 ++++++--
 .../pekko/stream/javadsl/SourceWithContext.scala   |  40 +++---
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  |  66 +++++++++
 .../apache/pekko/stream/javadsl/SubSource.scala    |  66 +++++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    | 105 ++++++++++----
 .../pekko/stream/scaladsl/FlowWithContext.scala    |  31 -----
 .../pekko/stream/scaladsl/FlowWithContextOps.scala |  30 ++++
 .../org/apache/pekko/stream/scaladsl/Source.scala  |  28 ----
 .../pekko/stream/scaladsl/SourceWithContext.scala  |  29 ----
 14 files changed, 421 insertions(+), 299 deletions(-)

diff --git 
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
 
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
index 6b46a5dc49..0b5a01a20b 100644
--- 
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
+++ 
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
@@ -63,8 +63,7 @@ private object MapAsyncPartitionedSpec {
           value = i.toString)
       }
 
-    def extractPartition(e: TestKeyValue): Int =
-      e.key
+    val partitioner: TestKeyValue => Int = kv => kv.key
 
     type Operation = TestKeyValue => Future[(Int, String)]
 
@@ -125,7 +124,7 @@ class MapAsyncPartitionedSpec
 
     val result =
       Source(elements)
-        .mapAsyncPartitionedUnordered(parallelism = 
2)(extractPartition)(blockingOperation)
+        .mapAsyncPartitionedUnordered(parallelism = 
2)(partitioner)(blockingOperation)
         .runWith(Sink.seq)
         .futureValue
         .map(_._2)
@@ -137,7 +136,7 @@ class MapAsyncPartitionedSpec
     forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: 
Seq[TestKeyValue]) =>
       val result =
         Source(elements.toIndexedSeq)
-          
.mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(asyncOperation)
+          
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(asyncOperation)
           .runWith(Sink.seq)
           .futureValue
 
@@ -153,7 +152,7 @@ class MapAsyncPartitionedSpec
       val result =
         Source
           .fromIterator(() => elements.iterator)
-          .mapAsyncPartitionedUnordered(parallelism = 
1)(extractPartition)(asyncOperation)
+          .mapAsyncPartitionedUnordered(parallelism = 
1)(partitioner)(asyncOperation)
           .runWith(Sink.seq)
           .futureValue
 
@@ -169,7 +168,7 @@ class MapAsyncPartitionedSpec
       val result =
         Source
           .fromIterator(() => elements.iterator)
-          
.mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(blockingOperation)
+          
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(blockingOperation)
           .runWith(Sink.seq)
           .futureValue
 
@@ -232,7 +231,7 @@ class MapAsyncPartitionedSpec
 
     val result =
       Source(elements)
-        .mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(fun)
+        .mapAsyncPartitionedUnordered(parallelism = 2)(partitioner)(fun)
         .runWith(Sink.seq)
         .futureValue
 
@@ -244,7 +243,7 @@ class MapAsyncPartitionedSpec
       an[IllegalArgumentException] shouldBe thrownBy {
         Source(infiniteStream())
           .mapAsyncPartitionedUnordered(
-            parallelism = zeroOrNegativeParallelism)(extractPartition = 
identity)(f = (_, _) => Future.unit)
+            parallelism = zeroOrNegativeParallelism)(partitioner = identity)(f 
= (_, _) => Future.unit)
           .runWith(Sink.ignore)
           .futureValue
       }
@@ -272,7 +271,7 @@ class MapAsyncPartitionedSpec
 
     val result =
       Source(elements)
-        .mapAsyncPartitioned(parallelism = 2)(extractPartition)(processElement)
+        .mapAsyncPartitioned(parallelism = 2)(partitioner)(processElement)
         .runWith(Sink.seq)
         .futureValue
         .map(_._2)
@@ -289,7 +288,7 @@ class MapAsyncPartitionedSpec
     forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: 
Seq[TestKeyValue]) =>
       val result =
         Source(elements.toIndexedSeq)
-          
.mapAsyncPartitioned(parallelism.value)(extractPartition)(asyncOperation)
+          .mapAsyncPartitioned(parallelism.value)(partitioner)(asyncOperation)
           .runWith(Sink.seq)
           .futureValue
 
@@ -305,7 +304,7 @@ class MapAsyncPartitionedSpec
       val result =
         Source
           .fromIterator(() => elements.iterator)
-          .mapAsyncPartitioned(parallelism = 
1)(extractPartition)(asyncOperation)
+          .mapAsyncPartitioned(parallelism = 1)(partitioner)(asyncOperation)
           .runWith(Sink.seq)
           .futureValue
 
@@ -321,7 +320,7 @@ class MapAsyncPartitionedSpec
       val result =
         Source
           .fromIterator(() => elements.iterator)
-          
.mapAsyncPartitioned(parallelism.value)(extractPartition)(blockingOperation)
+          
.mapAsyncPartitioned(parallelism.value)(partitioner)(blockingOperation)
           .runWith(Sink.seq)
           .futureValue
 
@@ -384,7 +383,7 @@ class MapAsyncPartitionedSpec
 
     val result =
       Source(elements)
-        .mapAsyncPartitioned(parallelism = 2)(extractPartition)(fun)
+        .mapAsyncPartitioned(parallelism = 2)(partitioner)(fun)
         .runWith(Sink.seq)
         .futureValue
 
@@ -396,7 +395,7 @@ class MapAsyncPartitionedSpec
       an[IllegalArgumentException] shouldBe thrownBy {
         Source(infiniteStream())
           .mapAsyncPartitioned(
-            parallelism = zeroOrNegativeParallelism)(extractPartition = 
identity)(f = (_, _) => Future.unit)
+            parallelism = zeroOrNegativeParallelism)(partitioner = identity)(f 
= (_, _) => Future.unit)
           .runWith(Sink.ignore)
           .futureValue
       }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala 
b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
index f328d66fe0..4ce63a6cbf 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
@@ -23,102 +23,29 @@ import scala.util.control.{ NoStackTrace, NonFatal }
 import scala.util.{ Failure, Success, Try }
 
 import org.apache.pekko
+import pekko.annotation.InternalApi
 import pekko.dispatch.ExecutionContexts
 import pekko.stream.ActorAttributes.SupervisionStrategy
-import pekko.stream.Attributes.{ Name, SourceLocation }
-import pekko.stream.MapAsyncPartitioned._
-import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, 
SourceWithContext }
 import pekko.stream.stage._
+import pekko.util.OptionVal
 
+/**
+ * Internal API
+ */
+@InternalApi
 private[stream] object MapAsyncPartitioned {
+  private val NotYetThere = Failure(new Exception with NoStackTrace)
 
-  private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => 
Partition)(tuple: (In, Ctx)): Partition =
-    extract(tuple._1)
-
-  private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => 
Future[Out])(tuple: (In, Ctx),
-      partition: Partition): Future[(Out, Ctx)] =
-    f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
-
-  def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
-      extractPartition: In => Partition)(
-      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
-    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
true, parallelism, extractPartition, f))
-
-  def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
-      extractPartition: In => Partition)(
-      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
-    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
false, parallelism, extractPartition, f))
-
-  def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat], parallelism: Int)(
-      extractPartition: In => Partition)(
-      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
-    flow.via(
-      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
-        orderedOutput = true,
-        parallelism,
-        extractPartitionWithCtx(extractPartition),
-        fWithCtx[In, T, Ctx, Partition](f)))
-
-  def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat],
-      parallelism: Int)(extractPartition: In => Partition)(
-      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
-    flow.via(
-      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
-        orderedOutput = false,
-        parallelism,
-        extractPartitionWithCtx(extractPartition),
-        fWithCtx[In, T, Ctx, Partition](f)))
-
-  def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
-      extractPartition: Out => Partition)(
-      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
-    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, 
parallelism, extractPartition,
-      f))
-
-  def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
-      extractPartition: Out => Partition)(
-      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
-    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, 
parallelism,
-      extractPartition, f))
-
-  def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
-      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
-      extractPartition: Out => Partition)(
-      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
-    flow.via(
-      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
-        orderedOutput = true,
-        parallelism,
-        extractPartitionWithCtx(extractPartition),
-        fWithCtx[Out, T, CtxOut, Partition](f)))
-
-  def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
-      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: 
Int)(extractPartition: Out => Partition)(
-      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
-    flow.via(
-      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
-        orderedOutput = false,
-        parallelism,
-        extractPartitionWithCtx(extractPartition),
-        fWithCtx[Out, T, CtxOut, Partition](f)))
-
-  private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception 
with NoStackTrace)
-
-  private[stream] final class Holder[In, Out](
-      val in: In,
-      var out: Try[Out],
-      callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
-
-    // To support both fail-fast when the supervision directive is Stop
-    // and not calling the decider multiple times (#23888) we need to cache 
the decider result and re-use that
-    private var cachedSupervisionDirective: Option[Supervision.Directive] = 
None
+  private final class Holder[In, Out](val in: In, var out: Try[Out], val cb: 
AsyncCallback[Holder[In, Out]]) extends (
+          Try[Out] => Unit) {
+    private var cachedSupervisionDirective: OptionVal[Supervision.Directive] = 
OptionVal.None
 
     def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): 
Supervision.Directive = {
       cachedSupervisionDirective match {
-        case Some(d) => d
+        case OptionVal.Some(d) => d
         case _ =>
           val d = decider(ex)
-          cachedSupervisionDirective = Some(d)
+          cachedSupervisionDirective = OptionVal.Some(d)
           d
       }
     }
@@ -128,27 +55,32 @@ private[stream] object MapAsyncPartitioned {
 
     override def apply(t: Try[Out]): Unit = {
       setOut(t)
-      callback.invoke(this)
+      cb.invoke(this)
     }
+
+    override def toString = s"Holder($in, $out)"
   }
 }
 
-private[stream] class MapAsyncPartitioned[In, Out, Partition](
-    orderedOutput: Boolean,
+/**
+ * Internal API
+ */
+@InternalApi
+private[stream] final class MapAsyncPartitioned[In, Out, Partition](
     parallelism: Int,
-    extractPartition: In => Partition,
+    orderedOutput: Boolean,
+    partitioner: In => Partition,
     f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] {
+  require(parallelism >= 1, "parallelism must be at least 1")
+  require(partitioner != null, "partitioner function should not be null")
+  require(f != null, "f function should not be null.")
+  import MapAsyncPartitioned._
 
-  if (parallelism < 1) throw new IllegalArgumentException("parallelism must be 
at least 1")
-
-  private val in = Inlet[In]("MapAsyncPartitionOrdered.in")
-  private val out = Outlet[Out]("MapAsyncPartitionOrdered.out")
+  private val in = Inlet[In]("MapAsyncPartitioned.in")
+  private val out = Outlet[Out]("MapAsyncPartitioned.out")
 
   override val shape: FlowShape[In, Out] = FlowShape(in, out)
 
-  override def initialAttributes: Attributes =
-    Attributes(Name("MapAsyncPartitionOrdered")) and 
SourceLocation.forLambda(f)
-
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
     new GraphStageLogic(shape) with InHandler with OutHandler {
       private val contextPropagation = pekko.stream.impl.ContextPropagation()
@@ -191,13 +123,12 @@ private[stream] class MapAsyncPartitioned[In, Out, 
Partition](
         buffer = mutable.Queue()
       }
 
-      override def onPull(): Unit =
-        pushNextIfPossible()
+      override def onPull(): Unit = pushNextIfPossible()
 
       override def onPush(): Unit = {
         try {
           val element = grab(in)
-          val partition = extractPartition(element)
+          val partition = partitioner(element)
 
           val wrappedInput = new Contextual(
             contextPropagation.currentContext(),
@@ -217,8 +148,7 @@ private[stream] class MapAsyncPartitioned[In, Out, 
Partition](
         pullIfNeeded()
       }
 
-      override def onUpstreamFinish(): Unit =
-        if (idle()) completeStage()
+      override def onUpstreamFinish(): Unit = if (idle()) completeStage()
 
       private def processElement(partition: Partition, wrappedInput: 
Contextual[Holder[In, Out]]): Unit = {
         import wrappedInput.{ element => holder }
@@ -289,7 +219,7 @@ private[stream] class MapAsyncPartitioned[In, Out, 
Partition](
           buffer = buffer.filter { case (partition, wrappedInput) =>
             import wrappedInput.{ element => holder }
 
-            if ((holder.out eq MapAsyncPartitioned.NotYetThere) || 
!isAvailable(out)) {
+            if ((holder.out eq NotYetThere) || !isAvailable(out)) {
               true
             } else {
               partitionsInProgress -= partition
@@ -321,12 +251,14 @@ private[stream] class MapAsyncPartitioned[In, Out, 
Partition](
         }
 
       private def drainQueue(): Unit = {
-        buffer.foreach {
-          case (partition, wrappedInput) =>
-            if (canStartNextElement(partition)) {
-              wrappedInput.resume()
-              processElement(partition, wrappedInput)
-            }
+        if (buffer.nonEmpty) {
+          buffer.foreach {
+            case (partition, wrappedInput) =>
+              if (canStartNextElement(partition)) {
+                wrappedInput.resume()
+                processElement(partition, wrappedInput)
+              }
+          }
         }
       }
 
@@ -335,11 +267,10 @@ private[stream] class MapAsyncPartitioned[In, Out, 
Partition](
         else if (buffer.size < parallelism && !hasBeenPulled(in)) tryPull(in)
       // else already pulled and waiting for next element
 
-      private def idle(): Boolean =
-        buffer.isEmpty
+      private def idle(): Boolean = buffer.isEmpty
 
       private def canStartNextElement(partition: Partition): Boolean =
-        !partitionsInProgress(partition) && partitionsInProgress.size < 
parallelism
+        !partitionsInProgress.contains(partition) && partitionsInProgress.size 
< parallelism
 
       setHandlers(in, out, this)
     }
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 4a36c8aa14..6b56a08e13 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -40,6 +40,8 @@ import pekko.stream.Attributes._
     val mapError = name("mapError")
     val mapAsync = name("mapAsync")
     val mapAsyncUnordered = name("mapAsyncUnordered")
+    val mapAsyncPartition = name("mapAsyncPartition")
+    val mapAsyncPartitionUnordered = name("mapAsyncPartitionUnordered")
     val ask = name("ask")
     val grouped = name("grouped")
     val groupedWithin = name("groupedWithin")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1d1c8f7854..0b2d60c58e 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -842,30 +842,68 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
   /**
    * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
    * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
+   * stream entry and the calculated partition value for that entry. The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[CompletionStage]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
for the next element in sequence
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
    *
    * @since 1.1.0
    * @see [[#mapAsync]]
    * @see [[#mapAsyncPartitionedUnordered]]
    */
-  def mapAsyncPartitioned[T, P](parallelism: Int,
-      extractPartition: function.Function[Out, P],
+  def mapAsyncPartitioned[T, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
       f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] =
-    MapAsyncPartitioned.mapFlowOrdered(delegate, 
parallelism)(extractPartition(_))(f(_, _).asScala).asJava
+    new Flow(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, 
_).asScala))
 
   /**
    * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
    * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
+   * stream entry and the calculated partition value for that entry.The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[CompletionStage]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
and downstream available.
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
    *
    * @since 1.1.0
    * @see [[#mapAsyncUnordered]]
    * @see [[#mapAsyncPartitioned]]
    */
-  def mapAsyncPartitionedUnordered[T, P](parallelism: Int,
-      extractPartition: function.Function[Out, P],
+  def mapAsyncPartitionedUnordered[T, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
       f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] =
-    MapAsyncPartitioned.mapFlowUnordered(delegate, 
parallelism)(extractPartition(_))(f(_, _).asScala).asJava
+    new 
Flow(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, 
_).asScala))
 
   /**
    * Transform this stream by applying the given function to each of the 
elements
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
index 84736777db..44dbeaaead 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
@@ -173,39 +173,40 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
   def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[In, CtxIn, 
Out2, CtxOut, Mat] =
     viaScala(_.map(f.apply))
 
+  /**
+   * Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsync]].
+   *
+   * @see [[pekko.stream.javadsl.Flow.mapAsync]]
+   */
   def mapAsync[Out2](
       parallelism: Int,
       f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[In, 
CtxIn, Out2, CtxOut, Mat] =
     viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
 
   /**
-   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
+   * Context-preserving variant of 
[[pekko.stream.javadsl.Flow.mapAsyncPartitioned]].
    *
    * @since 1.1.0
-   * @see [[#mapAsync]]
-   * @see [[#mapAsyncPartitionedUnordered]]
+   * @see [[pekko.stream.javadsl.Flow.mapAsyncPartitioned]]
    */
-  def mapAsyncPartitioned[Out2, P](parallelism: Int,
-      extractPartition: function.Function[Out, P],
+  def mapAsyncPartitioned[Out2, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
       f: function.Function2[Out, P, CompletionStage[Out2]]): 
FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = {
-    viaScala(_.mapAsyncPartitioned(parallelism)(extractPartition(_))(f(_, 
_).asScala))
+    viaScala(_.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, 
_).asScala))
   }
 
   /**
-   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
+   * Context-preserving variant of 
[[pekko.stream.javadsl.Flow.mapAsyncPartitionedUnordered]].
    *
    * @since 1.1.0
-   * @see [[#mapAsyncUnordered]]
-   * @see [[#mapAsyncPartitioned]]
+   * @see [[pekko.stream.javadsl.Flow.mapAsyncPartitionedUnordered]]
    */
-  def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int,
-      extractPartition: function.Function[Out, P],
+  def mapAsyncPartitionedUnordered[Out2, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
       f: function.Function2[Out, P, CompletionStage[Out2]]): 
FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = {
-    
viaScala(_.mapAsyncPartitionedUnordered(parallelism)(extractPartition(_))(f(_, 
_).asScala))
+    viaScala(_.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, 
_).asScala))
   }
 
   /**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index b87c5399c7..ae01282622 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2496,32 +2496,68 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
   /**
    * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
    * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
+   * stream entry and the calculated partition value for that entry. The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[CompletionStage]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
for the next element in sequence
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
    *
    * @since 1.1.0
    * @see [[#mapAsync]]
    * @see [[#mapAsyncPartitionedUnordered]]
    */
-  def mapAsyncPartitioned[T, P](parallelism: Int,
-      extractPartition: function.Function[Out, P],
+  def mapAsyncPartitioned[T, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
       f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, 
Mat] =
-    MapAsyncPartitioned.mapSourceOrdered(delegate, 
parallelism)(extractPartition(_))(f(_,
-      _).asScala).asJava
+    new Source(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, 
_).asScala))
 
   /**
    * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
    * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
+   * stream entry and the calculated partition value for that entry.The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[CompletionStage]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
and downstream available.
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
    *
    * @since 1.1.0
    * @see [[#mapAsyncUnordered]]
    * @see [[#mapAsyncPartitioned]]
    */
-  def mapAsyncPartitionedUnordered[T, P](parallelism: Int,
-      extractPartition: function.Function[Out, P],
+  def mapAsyncPartitionedUnordered[T, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
       f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, 
Mat] =
-    MapAsyncPartitioned.mapSourceUnordered(delegate, 
parallelism)(extractPartition(_))(f(_,
-      _).asScala).asJava
+    new 
Source(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, 
_).asScala))
 
   /**
    * Transform this stream by applying the given function to each of the 
elements
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
index 687c01fc05..8b77a813d5 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
@@ -169,44 +169,40 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: 
scaladsl.SourceWithCon
   def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Out2, Ctx, 
Mat] =
     viaScala(_.map(f.apply))
 
+  /**
+   * Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsync]].
+   *
+   * @see [[pekko.stream.javadsl.Source.mapAsync]]
+   */
   def mapAsync[Out2](
       parallelism: Int,
       f: function.Function[Out, CompletionStage[Out2]]): 
SourceWithContext[Out2, Ctx, Mat] =
     viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
 
   /**
-   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
+   * Context-preserving variant of 
[[pekko.stream.javadsl.Source.mapAsyncPartitioned]].
    *
    * @since 1.1.0
-   * @see [[#mapAsync]]
-   * @see [[#mapAsyncPartitionedUnordered]]
+   * @see [[pekko.stream.javadsl.Source.mapAsyncPartitioned]]
    */
-  def mapAsyncPartitioned[Out2, P](parallelism: Int,
-      extractPartition: function.Function[Out, P],
+  def mapAsyncPartitioned[Out2, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
       f: function.Function2[Out, P, CompletionStage[Out2]]): 
SourceWithContext[Out2, Ctx, Mat] = {
-    MapAsyncPartitioned.mapSourceWithContextOrdered(delegate, 
parallelism)(extractPartition(_))(f(_,
-      _).asScala)
-      .asJava
+    viaScala(_.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, 
_).asScala))
   }
 
   /**
-   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
+   * Context-preserving variant of 
[[pekko.stream.javadsl.Source.mapAsyncPartitionedUnordered]].
    *
    * @since 1.1.0
-   * @see [[#mapAsyncUnordered]]
-   * @see [[#mapAsyncPartitioned]]
+   * @see [[pekko.stream.javadsl.Source.mapAsyncPartitionedUnordered]]
    */
-  def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int,
-      extractPartition: function.Function[Out, P],
-      f: function.Function2[Out, P, CompletionStage[Out2]]): 
SourceWithContext[Out2, Ctx, Mat] = {
-    MapAsyncPartitioned.mapSourceWithContextUnordered(delegate, 
parallelism)(extractPartition(_))(f(_,
-      _).asScala)
-      .asJava
-  }
+  def mapAsyncPartitionedUnordered[Out2, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
+      f: function.Function2[Out, P, CompletionStage[Out2]]): 
SourceWithContext[Out2, Ctx, Mat] =
+    viaScala(_.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, 
_).asScala))
 
   /**
    * Context-preserving variant of [[pekko.stream.javadsl.Source.mapConcat]].
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 1b6309ff83..a4b2042ec0 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -348,6 +348,72 @@ class SubFlow[In, Out, Mat](
   def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, 
CompletionStage[T]]): SubFlow[In, T, Mat] =
     new SubFlow(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))
 
+  /**
+   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
+   * partition step before the transform step. The transform function receives 
the an individual
+   * stream entry and the calculated partition value for that entry. The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[CompletionStage]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
for the next element in sequence
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.1.0
+   * @see [[#mapAsync]]
+   * @see [[#mapAsyncPartitionedUnordered]]
+   */
+  def mapAsyncPartitioned[T, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
+      f: function.Function2[Out, P, CompletionStage[T]]): SubFlow[In, T, Mat] =
+    new SubFlow(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, 
_).asScala))
+
+  /**
+   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
+   * partition step before the transform step. The transform function receives 
the an individual
+   * stream entry and the calculated partition value for that entry.The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[CompletionStage]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
and downstream available.
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.1.0
+   * @see [[#mapAsyncUnordered]]
+   * @see [[#mapAsyncPartitioned]]
+   */
+  def mapAsyncPartitionedUnordered[T, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
+      f: function.Function2[Out, P, CompletionStage[T]]): SubFlow[In, T, Mat] =
+    new 
SubFlow(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, 
_).asScala))
+
   /**
    * Only pass on those elements that satisfy the given predicate.
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index f6b58ee630..e0f85c86a1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -339,6 +339,72 @@ class SubSource[Out, Mat](
   def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, 
CompletionStage[T]]): SubSource[T, Mat] =
     new SubSource(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))
 
+  /**
+   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
+   * partition step before the transform step. The transform function receives 
the an individual
+   * stream entry and the calculated partition value for that entry. The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[CompletionStage]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
for the next element in sequence
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.1.0
+   * @see [[#mapAsync]]
+   * @see [[#mapAsyncPartitionedUnordered]]
+   */
+  def mapAsyncPartitioned[T, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
+      f: function.Function2[Out, P, CompletionStage[T]]): SubSource[T, Mat] =
+    new 
SubSource(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, 
_).asScala))
+
+  /**
+   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
+   * partition step before the transform step. The transform function receives 
the an individual
+   * stream entry and the calculated partition value for that entry.The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[CompletionStage]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
and downstream available.
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.1.0
+   * @see [[#mapAsyncUnordered]]
+   * @see [[#mapAsyncPartitioned]]
+   */
+  def mapAsyncPartitionedUnordered[T, P](
+      parallelism: Int,
+      partitioner: function.Function[Out, P],
+      f: function.Function2[Out, P, CompletionStage[T]]): SubSource[T, Mat] =
+    new 
SubSource(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_,
 _).asScala))
+
   /**
    * Only pass on those elements that satisfy the given predicate.
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 42b29030d7..ec1586d831 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -163,36 +163,6 @@ final class Flow[-In, +Out, +Mat](
   override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
     new Flow(traversalBuilder.transformMat(f), shape)
 
-  /**
-   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
-   *
-   * @since 1.1.0
-   * @see [[#mapAsync]]
-   * @see [[#mapAsyncPartitionedUnordered]]
-   */
-  def mapAsyncPartitioned[T, P](parallelism: Int)(
-      extractPartition: Out => P)(
-      f: (Out, P) => Future[T]): Flow[In, T, Mat] = {
-    MapAsyncPartitioned.mapFlowOrdered(this, parallelism)(extractPartition)(f)
-  }
-
-  /**
-   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
-   *
-   * @since 1.1.0
-   * @see [[#mapAsyncUnordered]]
-   * @see [[#mapAsyncPartitioned]]
-   */
-  def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
-      extractPartition: Out => P)(
-      f: (Out, P) => Future[T]): Flow[In, T, Mat] = {
-    MapAsyncPartitioned.mapFlowUnordered(this, 
parallelism)(extractPartition)(f)
-  }
-
   /**
    * Materializes this [[Flow]], immediately returning (1) its materialized 
value, and (2) a newly materialized [[Flow]].
    * The returned flow is partial materialized and do not support multiple 
times materialization.
@@ -1173,6 +1143,81 @@ trait FlowOps[+Out, +Mat] {
    */
   def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T] = 
via(MapAsyncUnordered(parallelism, f))
 
+  /**
+   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
+   * partition step before the transform step. The transform function receives 
the an individual
+   * stream entry and the calculated partition value for that entry. The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[Future]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
for the next element in sequence
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.1.0
+   * @see [[#mapAsync]]
+   * @see [[#mapAsyncPartitionedUnordered]]
+   */
+  def mapAsyncPartitioned[T, P](parallelism: Int)(
+      partitioner: Out => P)(
+      f: (Out, P) => Future[T]): Repr[T] = {
+    (if (parallelism == 1) {
+       via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
+     } else {
+       via(new MapAsyncPartitioned(parallelism, orderedOutput = true, 
partitioner, f))
+     })
+      .withAttributes(DefaultAttributes.mapAsyncPartition and 
SourceLocation.forLambda(f))
+  }
+
+  /**
+   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
+   * partition step before the transform step. The transform function receives 
the an individual
+   * stream entry and the calculated partition value for that entry.The max 
parallelism of per partition is 1.
+   *
+   * The function `partitioner` is always invoked on the elements in the order 
they arrive.
+   * The function `f` is always invoked on the elements which in the same 
partition in the order they arrive.
+   *
+   * If the function `partitioner` or `f` throws an exception or if the 
[[Future]] is completed
+   * with failure and the supervision decision is 
[[pekko.stream.Supervision.Stop]]
+   * the stream will be completed with failure, otherwise the stream continues 
and the current element is dropped.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the Future returned by the provided function finishes 
and downstream available.
+   *
+   * '''Backpressures when''' the number of futures reaches the configured 
parallelism and the downstream
+   * backpressures
+   *
+   * '''Completes when''' upstream completes and all futures have been 
completed and all elements have been emitted
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.1.0
+   * @see [[#mapAsyncUnordered]]
+   * @see [[#mapAsyncPartitioned]]
+   */
+  def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
+      partitioner: Out => P)(
+      f: (Out, P) => Future[T]): Repr[T] = {
+    (if (parallelism == 1) {
+       via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
+     } else {
+       via(new MapAsyncPartitioned(parallelism, orderedOutput = false, 
partitioner, f))
+     }).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and 
SourceLocation.forLambda(f))
+  }
+
   /**
    * Use the `ask` pattern to send a request-reply message to the target `ref` 
actor.
    * If any of the asks times out it will fail the stream with a 
[[pekko.pattern.AskTimeoutException]].
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
index 1def589f37..8925afc47a 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
@@ -14,7 +14,6 @@
 package org.apache.pekko.stream.scaladsl
 
 import scala.annotation.unchecked.uncheckedVariance
-import scala.concurrent.Future
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.japi.Pair
@@ -90,36 +89,6 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, 
+Mat](delegate: Flow[(In
   def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn, 
Out, CtxOut, Mat2] =
     new FlowWithContext(delegate.mapMaterializedValue(f))
 
-  /**
-   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
-   *
-   * @since 1.1.0
-   * @see [[#mapAsync]]
-   * @see [[#mapAsyncPartitionedUnordered]]
-   */
-  def mapAsyncPartitioned[T, P](parallelism: Int)(
-      extractPartition: Out => P)(
-      f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = {
-    MapAsyncPartitioned.mapFlowWithContextOrdered(this, 
parallelism)(extractPartition)(f)
-  }
-
-  /**
-   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
-   *
-   * @since 1.1.0
-   * @see [[#mapAsyncUnordered]]
-   * @see [[#mapAsyncPartitioned]]
-   */
-  def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
-      extractPartition: Out => P)(
-      f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = {
-    MapAsyncPartitioned.mapFlowWithContextUnordered(this, 
parallelism)(extractPartition)(f)
-  }
-
   def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate
 
   def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat 
>: Mat]
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
index 370f49e520..6c12dd4344 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
@@ -113,6 +113,36 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
       case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.parasitic)
     })
 
+  /**
+   * Context-preserving variant of 
[[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]].
+   *
+   * @since 1.1.0
+   * @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]]
+   */
+  def mapAsyncPartitioned[Out2, P](parallelism: Int)(
+      partitioner: Out => P)(
+      f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = {
+    via(flow[Out, Ctx].mapAsyncPartitioned(parallelism)(pair => 
partitioner(pair._1)) {
+      (pair, partition) =>
+        f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic)
+    })
+  }
+
+  /**
+   * Context-preserving variant of 
[[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]].
+   *
+   * @since 1.1.0
+   * @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]]
+   */
+  def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int)(
+      partitioner: Out => P)(
+      f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = {
+    via(flow[Out, Ctx].mapAsyncPartitionedUnordered(parallelism)(pair => 
partitioner(pair._1)) {
+      (pair, partition) =>
+        f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic)
+    })
+  }
+
   /**
    * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collect]].
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 8635a42ad0..429276f36d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -99,34 +99,6 @@ final class Source[+Out, +Mat](
   override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
     new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any => 
Any]), shape)
 
-  /**
-   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
-   *
-   * @since 1.1.0
-   * @see [[#mapAsync]]
-   * @see [[#mapAsyncPartitionedUnordered]]
-   */
-  def mapAsyncPartitioned[T, P](parallelism: Int)(
-      extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = {
-    MapAsyncPartitioned.mapSourceOrdered(this, 
parallelism)(extractPartition)(f)
-  }
-
-  /**
-   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
-   *
-   * @since 1.1.0
-   * @see [[#mapAsyncUnordered]]
-   * @see [[#mapAsyncPartitioned]]
-   */
-  def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
-      extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = {
-    MapAsyncPartitioned.mapSourceUnordered(this, 
parallelism)(extractPartition)(f)
-  }
-
   /**
    * Materializes this Source, immediately returning (1) its materialized 
value, and (2) a new Source
    * that can be used to consume elements from the newly materialized Source.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
index 47d2c14bed..5ce8bcd752 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
@@ -14,7 +14,6 @@
 package org.apache.pekko.stream.scaladsl
 
 import scala.annotation.unchecked.uncheckedVariance
-import scala.concurrent.Future
 import org.apache.pekko
 import pekko.stream._
 
@@ -78,34 +77,6 @@ final class SourceWithContext[+Out, +Ctx, +Mat] 
private[stream] (delegate: Sourc
   def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx, 
Mat2] =
     new SourceWithContext(delegate.mapMaterializedValue(f))
 
-  /**
-   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
-   *
-   * @since 1.1.0
-   * @see [[#mapAsync]]
-   * @see [[#mapAsyncPartitionedUnordered]]
-   */
-  def mapAsyncPartitioned[T, P](parallelism: Int)(
-      extractPartition: Out => P)(f: (Out, P) => Future[T]): 
SourceWithContext[T, Ctx, Mat] = {
-    MapAsyncPartitioned.mapSourceWithContextOrdered(this, 
parallelism)(extractPartition)(f)
-  }
-
-  /**
-   * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] 
but with an additional
-   * partition step before the transform step. The transform function receives 
the an individual
-   * stream entry and the calculated partition value for that entry.
-   *
-   * @since 1.1.0
-   * @see [[#mapAsyncUnordered]]
-   * @see [[#mapAsyncPartitioned]]
-   */
-  def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
-      extractPartition: Out => P)(f: (Out, P) => Future[T]): 
SourceWithContext[T, Ctx, Mat] = {
-    MapAsyncPartitioned.mapSourceWithContextUnordered(this, 
parallelism)(extractPartition)(f)
-  }
-
   /**
    * Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a 
[[pekko.stream.scaladsl.Sink]],
    * concatenating the processing steps of both.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to