This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 1b1f57224b =str Tweak the stream mapAsyncPartitioned operator
1b1f57224b is described below
commit 1b1f57224b409c8b2cbd5267ace814439e97d3ea
Author: He-Pin <[email protected]>
AuthorDate: Sat Sep 23 14:51:31 2023 +0800
=str Tweak the stream mapAsyncPartitioned operator
---
.../pekko/stream/MapAsyncPartitionedSpec.scala | 27 ++--
.../apache/pekko/stream/MapAsyncPartitioned.scala | 153 ++++++---------------
.../org/apache/pekko/stream/impl/Stages.scala | 2 +
.../org/apache/pekko/stream/javadsl/Flow.scala | 54 ++++++--
.../pekko/stream/javadsl/FlowWithContext.scala | 33 ++---
.../org/apache/pekko/stream/javadsl/Source.scala | 56 ++++++--
.../pekko/stream/javadsl/SourceWithContext.scala | 40 +++---
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 66 +++++++++
.../apache/pekko/stream/javadsl/SubSource.scala | 66 +++++++++
.../org/apache/pekko/stream/scaladsl/Flow.scala | 105 ++++++++++----
.../pekko/stream/scaladsl/FlowWithContext.scala | 31 -----
.../pekko/stream/scaladsl/FlowWithContextOps.scala | 30 ++++
.../org/apache/pekko/stream/scaladsl/Source.scala | 28 ----
.../pekko/stream/scaladsl/SourceWithContext.scala | 29 ----
14 files changed, 421 insertions(+), 299 deletions(-)
diff --git
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
index 6b46a5dc49..0b5a01a20b 100644
---
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
+++
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
@@ -63,8 +63,7 @@ private object MapAsyncPartitionedSpec {
value = i.toString)
}
- def extractPartition(e: TestKeyValue): Int =
- e.key
+ val partitioner: TestKeyValue => Int = kv => kv.key
type Operation = TestKeyValue => Future[(Int, String)]
@@ -125,7 +124,7 @@ class MapAsyncPartitionedSpec
val result =
Source(elements)
- .mapAsyncPartitionedUnordered(parallelism =
2)(extractPartition)(blockingOperation)
+ .mapAsyncPartitionedUnordered(parallelism =
2)(partitioner)(blockingOperation)
.runWith(Sink.seq)
.futureValue
.map(_._2)
@@ -137,7 +136,7 @@ class MapAsyncPartitionedSpec
forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements:
Seq[TestKeyValue]) =>
val result =
Source(elements.toIndexedSeq)
-
.mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(asyncOperation)
+
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(asyncOperation)
.runWith(Sink.seq)
.futureValue
@@ -153,7 +152,7 @@ class MapAsyncPartitionedSpec
val result =
Source
.fromIterator(() => elements.iterator)
- .mapAsyncPartitionedUnordered(parallelism =
1)(extractPartition)(asyncOperation)
+ .mapAsyncPartitionedUnordered(parallelism =
1)(partitioner)(asyncOperation)
.runWith(Sink.seq)
.futureValue
@@ -169,7 +168,7 @@ class MapAsyncPartitionedSpec
val result =
Source
.fromIterator(() => elements.iterator)
-
.mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(blockingOperation)
+
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(blockingOperation)
.runWith(Sink.seq)
.futureValue
@@ -232,7 +231,7 @@ class MapAsyncPartitionedSpec
val result =
Source(elements)
- .mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(fun)
+ .mapAsyncPartitionedUnordered(parallelism = 2)(partitioner)(fun)
.runWith(Sink.seq)
.futureValue
@@ -244,7 +243,7 @@ class MapAsyncPartitionedSpec
an[IllegalArgumentException] shouldBe thrownBy {
Source(infiniteStream())
.mapAsyncPartitionedUnordered(
- parallelism = zeroOrNegativeParallelism)(extractPartition =
identity)(f = (_, _) => Future.unit)
+ parallelism = zeroOrNegativeParallelism)(partitioner = identity)(f
= (_, _) => Future.unit)
.runWith(Sink.ignore)
.futureValue
}
@@ -272,7 +271,7 @@ class MapAsyncPartitionedSpec
val result =
Source(elements)
- .mapAsyncPartitioned(parallelism = 2)(extractPartition)(processElement)
+ .mapAsyncPartitioned(parallelism = 2)(partitioner)(processElement)
.runWith(Sink.seq)
.futureValue
.map(_._2)
@@ -289,7 +288,7 @@ class MapAsyncPartitionedSpec
forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements:
Seq[TestKeyValue]) =>
val result =
Source(elements.toIndexedSeq)
-
.mapAsyncPartitioned(parallelism.value)(extractPartition)(asyncOperation)
+ .mapAsyncPartitioned(parallelism.value)(partitioner)(asyncOperation)
.runWith(Sink.seq)
.futureValue
@@ -305,7 +304,7 @@ class MapAsyncPartitionedSpec
val result =
Source
.fromIterator(() => elements.iterator)
- .mapAsyncPartitioned(parallelism =
1)(extractPartition)(asyncOperation)
+ .mapAsyncPartitioned(parallelism = 1)(partitioner)(asyncOperation)
.runWith(Sink.seq)
.futureValue
@@ -321,7 +320,7 @@ class MapAsyncPartitionedSpec
val result =
Source
.fromIterator(() => elements.iterator)
-
.mapAsyncPartitioned(parallelism.value)(extractPartition)(blockingOperation)
+
.mapAsyncPartitioned(parallelism.value)(partitioner)(blockingOperation)
.runWith(Sink.seq)
.futureValue
@@ -384,7 +383,7 @@ class MapAsyncPartitionedSpec
val result =
Source(elements)
- .mapAsyncPartitioned(parallelism = 2)(extractPartition)(fun)
+ .mapAsyncPartitioned(parallelism = 2)(partitioner)(fun)
.runWith(Sink.seq)
.futureValue
@@ -396,7 +395,7 @@ class MapAsyncPartitionedSpec
an[IllegalArgumentException] shouldBe thrownBy {
Source(infiniteStream())
.mapAsyncPartitioned(
- parallelism = zeroOrNegativeParallelism)(extractPartition =
identity)(f = (_, _) => Future.unit)
+ parallelism = zeroOrNegativeParallelism)(partitioner = identity)(f
= (_, _) => Future.unit)
.runWith(Sink.ignore)
.futureValue
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
index f328d66fe0..4ce63a6cbf 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
@@ -23,102 +23,29 @@ import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.{ Failure, Success, Try }
import org.apache.pekko
+import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.stream.ActorAttributes.SupervisionStrategy
-import pekko.stream.Attributes.{ Name, SourceLocation }
-import pekko.stream.MapAsyncPartitioned._
-import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source,
SourceWithContext }
import pekko.stream.stage._
+import pekko.util.OptionVal
+/**
+ * Internal API
+ */
+@InternalApi
private[stream] object MapAsyncPartitioned {
+ private val NotYetThere = Failure(new Exception with NoStackTrace)
- private def extractPartitionWithCtx[In, Ctx, Partition](extract: In =>
Partition)(tuple: (In, Ctx)): Partition =
- extract(tuple._1)
-
- private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) =>
Future[Out])(tuple: (In, Ctx),
- partition: Partition): Future[(Out, Ctx)] =
- f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
-
- def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat],
parallelism: Int)(
- extractPartition: In => Partition)(
- f: (In, Partition) => Future[Out]): Source[Out, Mat] =
- source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput =
true, parallelism, extractPartition, f))
-
- def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat],
parallelism: Int)(
- extractPartition: In => Partition)(
- f: (In, Partition) => Future[Out]): Source[Out, Mat] =
- source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput =
false, parallelism, extractPartition, f))
-
- def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow:
SourceWithContext[In, Ctx, Mat], parallelism: Int)(
- extractPartition: In => Partition)(
- f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
- flow.via(
- new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
- orderedOutput = true,
- parallelism,
- extractPartitionWithCtx(extractPartition),
- fWithCtx[In, T, Ctx, Partition](f)))
-
- def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow:
SourceWithContext[In, Ctx, Mat],
- parallelism: Int)(extractPartition: In => Partition)(
- f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
- flow.via(
- new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
- orderedOutput = false,
- parallelism,
- extractPartitionWithCtx(extractPartition),
- fWithCtx[In, T, Ctx, Partition](f)))
-
- def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat],
parallelism: Int)(
- extractPartition: Out => Partition)(
- f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
- flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true,
parallelism, extractPartition,
- f))
-
- def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat],
parallelism: Int)(
- extractPartition: Out => Partition)(
- f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
- flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false,
parallelism,
- extractPartition, f))
-
- def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
- flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
- extractPartition: Out => Partition)(
- f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut,
Mat] =
- flow.via(
- new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
- orderedOutput = true,
- parallelism,
- extractPartitionWithCtx(extractPartition),
- fWithCtx[Out, T, CtxOut, Partition](f)))
-
- def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
- flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism:
Int)(extractPartition: Out => Partition)(
- f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut,
Mat] =
- flow.via(
- new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
- orderedOutput = false,
- parallelism,
- extractPartitionWithCtx(extractPartition),
- fWithCtx[Out, T, CtxOut, Partition](f)))
-
- private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception
with NoStackTrace)
-
- private[stream] final class Holder[In, Out](
- val in: In,
- var out: Try[Out],
- callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
-
- // To support both fail-fast when the supervision directive is Stop
- // and not calling the decider multiple times (#23888) we need to cache
the decider result and re-use that
- private var cachedSupervisionDirective: Option[Supervision.Directive] =
None
+ private final class Holder[In, Out](val in: In, var out: Try[Out], val cb:
AsyncCallback[Holder[In, Out]]) extends (
+ Try[Out] => Unit) {
+ private var cachedSupervisionDirective: OptionVal[Supervision.Directive] =
OptionVal.None
def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable):
Supervision.Directive = {
cachedSupervisionDirective match {
- case Some(d) => d
+ case OptionVal.Some(d) => d
case _ =>
val d = decider(ex)
- cachedSupervisionDirective = Some(d)
+ cachedSupervisionDirective = OptionVal.Some(d)
d
}
}
@@ -128,27 +55,32 @@ private[stream] object MapAsyncPartitioned {
override def apply(t: Try[Out]): Unit = {
setOut(t)
- callback.invoke(this)
+ cb.invoke(this)
}
+
+ override def toString = s"Holder($in, $out)"
}
}
-private[stream] class MapAsyncPartitioned[In, Out, Partition](
- orderedOutput: Boolean,
+/**
+ * Internal API
+ */
+@InternalApi
+private[stream] final class MapAsyncPartitioned[In, Out, Partition](
parallelism: Int,
- extractPartition: In => Partition,
+ orderedOutput: Boolean,
+ partitioner: In => Partition,
f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] {
+ require(parallelism >= 1, "parallelism must be at least 1")
+ require(partitioner != null, "partitioner function should not be null")
+ require(f != null, "f function should not be null.")
+ import MapAsyncPartitioned._
- if (parallelism < 1) throw new IllegalArgumentException("parallelism must be
at least 1")
-
- private val in = Inlet[In]("MapAsyncPartitionOrdered.in")
- private val out = Outlet[Out]("MapAsyncPartitionOrdered.out")
+ private val in = Inlet[In]("MapAsyncPartitioned.in")
+ private val out = Outlet[Out]("MapAsyncPartitioned.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override def initialAttributes: Attributes =
- Attributes(Name("MapAsyncPartitionOrdered")) and
SourceLocation.forLambda(f)
-
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val contextPropagation = pekko.stream.impl.ContextPropagation()
@@ -191,13 +123,12 @@ private[stream] class MapAsyncPartitioned[In, Out,
Partition](
buffer = mutable.Queue()
}
- override def onPull(): Unit =
- pushNextIfPossible()
+ override def onPull(): Unit = pushNextIfPossible()
override def onPush(): Unit = {
try {
val element = grab(in)
- val partition = extractPartition(element)
+ val partition = partitioner(element)
val wrappedInput = new Contextual(
contextPropagation.currentContext(),
@@ -217,8 +148,7 @@ private[stream] class MapAsyncPartitioned[In, Out,
Partition](
pullIfNeeded()
}
- override def onUpstreamFinish(): Unit =
- if (idle()) completeStage()
+ override def onUpstreamFinish(): Unit = if (idle()) completeStage()
private def processElement(partition: Partition, wrappedInput:
Contextual[Holder[In, Out]]): Unit = {
import wrappedInput.{ element => holder }
@@ -289,7 +219,7 @@ private[stream] class MapAsyncPartitioned[In, Out,
Partition](
buffer = buffer.filter { case (partition, wrappedInput) =>
import wrappedInput.{ element => holder }
- if ((holder.out eq MapAsyncPartitioned.NotYetThere) ||
!isAvailable(out)) {
+ if ((holder.out eq NotYetThere) || !isAvailable(out)) {
true
} else {
partitionsInProgress -= partition
@@ -321,12 +251,14 @@ private[stream] class MapAsyncPartitioned[In, Out,
Partition](
}
private def drainQueue(): Unit = {
- buffer.foreach {
- case (partition, wrappedInput) =>
- if (canStartNextElement(partition)) {
- wrappedInput.resume()
- processElement(partition, wrappedInput)
- }
+ if (buffer.nonEmpty) {
+ buffer.foreach {
+ case (partition, wrappedInput) =>
+ if (canStartNextElement(partition)) {
+ wrappedInput.resume()
+ processElement(partition, wrappedInput)
+ }
+ }
}
}
@@ -335,11 +267,10 @@ private[stream] class MapAsyncPartitioned[In, Out,
Partition](
else if (buffer.size < parallelism && !hasBeenPulled(in)) tryPull(in)
// else already pulled and waiting for next element
- private def idle(): Boolean =
- buffer.isEmpty
+ private def idle(): Boolean = buffer.isEmpty
private def canStartNextElement(partition: Partition): Boolean =
- !partitionsInProgress(partition) && partitionsInProgress.size <
parallelism
+ !partitionsInProgress.contains(partition) && partitionsInProgress.size
< parallelism
setHandlers(in, out, this)
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 4a36c8aa14..6b56a08e13 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -40,6 +40,8 @@ import pekko.stream.Attributes._
val mapError = name("mapError")
val mapAsync = name("mapAsync")
val mapAsyncUnordered = name("mapAsyncUnordered")
+ val mapAsyncPartition = name("mapAsyncPartition")
+ val mapAsyncPartitionUnordered = name("mapAsyncPartitionUnordered")
val ask = name("ask")
val grouped = name("grouped")
val groupedWithin = name("groupedWithin")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1d1c8f7854..0b2d60c58e 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -842,30 +842,68 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
* partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
+ * stream entry and the calculated partition value for that entry. The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[CompletionStage]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
for the next element in sequence
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
- def mapAsyncPartitioned[T, P](parallelism: Int,
- extractPartition: function.Function[Out, P],
+ def mapAsyncPartitioned[T, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] =
- MapAsyncPartitioned.mapFlowOrdered(delegate,
parallelism)(extractPartition(_))(f(_, _).asScala).asJava
+ new Flow(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_,
_).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
* partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
+ * stream entry and the calculated partition value for that entry.The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[CompletionStage]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
and downstream available.
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
- def mapAsyncPartitionedUnordered[T, P](parallelism: Int,
- extractPartition: function.Function[Out, P],
+ def mapAsyncPartitionedUnordered[T, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] =
- MapAsyncPartitioned.mapFlowUnordered(delegate,
parallelism)(extractPartition(_))(f(_, _).asScala).asJava
+ new
Flow(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_,
_).asScala))
/**
* Transform this stream by applying the given function to each of the
elements
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
index 84736777db..44dbeaaead 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
@@ -173,39 +173,40 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[In, CtxIn,
Out2, CtxOut, Mat] =
viaScala(_.map(f.apply))
+ /**
+ * Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsync]].
+ *
+ * @see [[pekko.stream.javadsl.Flow.mapAsync]]
+ */
def mapAsync[Out2](
parallelism: Int,
f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[In,
CtxIn, Out2, CtxOut, Mat] =
viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
/**
- * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
+ * Context-preserving variant of
[[pekko.stream.javadsl.Flow.mapAsyncPartitioned]].
*
* @since 1.1.0
- * @see [[#mapAsync]]
- * @see [[#mapAsyncPartitionedUnordered]]
+ * @see [[pekko.stream.javadsl.Flow.mapAsyncPartitioned]]
*/
- def mapAsyncPartitioned[Out2, P](parallelism: Int,
- extractPartition: function.Function[Out, P],
+ def mapAsyncPartitioned[Out2, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[Out2]]):
FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = {
- viaScala(_.mapAsyncPartitioned(parallelism)(extractPartition(_))(f(_,
_).asScala))
+ viaScala(_.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_,
_).asScala))
}
/**
- * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
+ * Context-preserving variant of
[[pekko.stream.javadsl.Flow.mapAsyncPartitionedUnordered]].
*
* @since 1.1.0
- * @see [[#mapAsyncUnordered]]
- * @see [[#mapAsyncPartitioned]]
+ * @see [[pekko.stream.javadsl.Flow.mapAsyncPartitionedUnordered]]
*/
- def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int,
- extractPartition: function.Function[Out, P],
+ def mapAsyncPartitionedUnordered[Out2, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[Out2]]):
FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = {
-
viaScala(_.mapAsyncPartitionedUnordered(parallelism)(extractPartition(_))(f(_,
_).asScala))
+ viaScala(_.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_,
_).asScala))
}
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index b87c5399c7..ae01282622 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2496,32 +2496,68 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
* partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
+ * stream entry and the calculated partition value for that entry. The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[CompletionStage]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
for the next element in sequence
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
- def mapAsyncPartitioned[T, P](parallelism: Int,
- extractPartition: function.Function[Out, P],
+ def mapAsyncPartitioned[T, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T,
Mat] =
- MapAsyncPartitioned.mapSourceOrdered(delegate,
parallelism)(extractPartition(_))(f(_,
- _).asScala).asJava
+ new Source(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_,
_).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
* partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
+ * stream entry and the calculated partition value for that entry.The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[CompletionStage]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
and downstream available.
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
- def mapAsyncPartitionedUnordered[T, P](parallelism: Int,
- extractPartition: function.Function[Out, P],
+ def mapAsyncPartitionedUnordered[T, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T,
Mat] =
- MapAsyncPartitioned.mapSourceUnordered(delegate,
parallelism)(extractPartition(_))(f(_,
- _).asScala).asJava
+ new
Source(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_,
_).asScala))
/**
* Transform this stream by applying the given function to each of the
elements
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
index 687c01fc05..8b77a813d5 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
@@ -169,44 +169,40 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate:
scaladsl.SourceWithCon
def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Out2, Ctx,
Mat] =
viaScala(_.map(f.apply))
+ /**
+ * Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsync]].
+ *
+ * @see [[pekko.stream.javadsl.Source.mapAsync]]
+ */
def mapAsync[Out2](
parallelism: Int,
f: function.Function[Out, CompletionStage[Out2]]):
SourceWithContext[Out2, Ctx, Mat] =
viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
/**
- * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
+ * Context-preserving variant of
[[pekko.stream.javadsl.Source.mapAsyncPartitioned]].
*
* @since 1.1.0
- * @see [[#mapAsync]]
- * @see [[#mapAsyncPartitionedUnordered]]
+ * @see [[pekko.stream.javadsl.Source.mapAsyncPartitioned]]
*/
- def mapAsyncPartitioned[Out2, P](parallelism: Int,
- extractPartition: function.Function[Out, P],
+ def mapAsyncPartitioned[Out2, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[Out2]]):
SourceWithContext[Out2, Ctx, Mat] = {
- MapAsyncPartitioned.mapSourceWithContextOrdered(delegate,
parallelism)(extractPartition(_))(f(_,
- _).asScala)
- .asJava
+ viaScala(_.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_,
_).asScala))
}
/**
- * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
+ * Context-preserving variant of
[[pekko.stream.javadsl.Source.mapAsyncPartitionedUnordered]].
*
* @since 1.1.0
- * @see [[#mapAsyncUnordered]]
- * @see [[#mapAsyncPartitioned]]
+ * @see [[pekko.stream.javadsl.Source.mapAsyncPartitionedUnordered]]
*/
- def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int,
- extractPartition: function.Function[Out, P],
- f: function.Function2[Out, P, CompletionStage[Out2]]):
SourceWithContext[Out2, Ctx, Mat] = {
- MapAsyncPartitioned.mapSourceWithContextUnordered(delegate,
parallelism)(extractPartition(_))(f(_,
- _).asScala)
- .asJava
- }
+ def mapAsyncPartitionedUnordered[Out2, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
+ f: function.Function2[Out, P, CompletionStage[Out2]]):
SourceWithContext[Out2, Ctx, Mat] =
+ viaScala(_.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_,
_).asScala))
/**
* Context-preserving variant of [[pekko.stream.javadsl.Source.mapConcat]].
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 1b6309ff83..a4b2042ec0 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -348,6 +348,72 @@ class SubFlow[In, Out, Mat](
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out,
CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))
+ /**
+ * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
+ * partition step before the transform step. The transform function receives
the an individual
+ * stream entry and the calculated partition value for that entry. The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[CompletionStage]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
for the next element in sequence
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.1.0
+ * @see [[#mapAsync]]
+ * @see [[#mapAsyncPartitionedUnordered]]
+ */
+ def mapAsyncPartitioned[T, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
+ f: function.Function2[Out, P, CompletionStage[T]]): SubFlow[In, T, Mat] =
+ new SubFlow(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_,
_).asScala))
+
+ /**
+ * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
+ * partition step before the transform step. The transform function receives
the an individual
+ * stream entry and the calculated partition value for that entry.The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[CompletionStage]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
and downstream available.
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.1.0
+ * @see [[#mapAsyncUnordered]]
+ * @see [[#mapAsyncPartitioned]]
+ */
+ def mapAsyncPartitionedUnordered[T, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
+ f: function.Function2[Out, P, CompletionStage[T]]): SubFlow[In, T, Mat] =
+ new
SubFlow(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_,
_).asScala))
+
/**
* Only pass on those elements that satisfy the given predicate.
*
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index f6b58ee630..e0f85c86a1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -339,6 +339,72 @@ class SubSource[Out, Mat](
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out,
CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))
+ /**
+ * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
+ * partition step before the transform step. The transform function receives
the an individual
+ * stream entry and the calculated partition value for that entry. The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[CompletionStage]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
for the next element in sequence
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.1.0
+ * @see [[#mapAsync]]
+ * @see [[#mapAsyncPartitionedUnordered]]
+ */
+ def mapAsyncPartitioned[T, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
+ f: function.Function2[Out, P, CompletionStage[T]]): SubSource[T, Mat] =
+ new
SubSource(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_,
_).asScala))
+
+ /**
+ * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
+ * partition step before the transform step. The transform function receives
the an individual
+ * stream entry and the calculated partition value for that entry.The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[CompletionStage]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
and downstream available.
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.1.0
+ * @see [[#mapAsyncUnordered]]
+ * @see [[#mapAsyncPartitioned]]
+ */
+ def mapAsyncPartitionedUnordered[T, P](
+ parallelism: Int,
+ partitioner: function.Function[Out, P],
+ f: function.Function2[Out, P, CompletionStage[T]]): SubSource[T, Mat] =
+ new
SubSource(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_,
_).asScala))
+
/**
* Only pass on those elements that satisfy the given predicate.
*
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 42b29030d7..ec1586d831 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -163,36 +163,6 @@ final class Flow[-In, +Out, +Mat](
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
new Flow(traversalBuilder.transformMat(f), shape)
- /**
- * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
- *
- * @since 1.1.0
- * @see [[#mapAsync]]
- * @see [[#mapAsyncPartitionedUnordered]]
- */
- def mapAsyncPartitioned[T, P](parallelism: Int)(
- extractPartition: Out => P)(
- f: (Out, P) => Future[T]): Flow[In, T, Mat] = {
- MapAsyncPartitioned.mapFlowOrdered(this, parallelism)(extractPartition)(f)
- }
-
- /**
- * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
- *
- * @since 1.1.0
- * @see [[#mapAsyncUnordered]]
- * @see [[#mapAsyncPartitioned]]
- */
- def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
- extractPartition: Out => P)(
- f: (Out, P) => Future[T]): Flow[In, T, Mat] = {
- MapAsyncPartitioned.mapFlowUnordered(this,
parallelism)(extractPartition)(f)
- }
-
/**
* Materializes this [[Flow]], immediately returning (1) its materialized
value, and (2) a newly materialized [[Flow]].
* The returned flow is partial materialized and do not support multiple
times materialization.
@@ -1173,6 +1143,81 @@ trait FlowOps[+Out, +Mat] {
*/
def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T] =
via(MapAsyncUnordered(parallelism, f))
+ /**
+ * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
+ * partition step before the transform step. The transform function receives
the an individual
+ * stream entry and the calculated partition value for that entry. The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[Future]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
for the next element in sequence
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.1.0
+ * @see [[#mapAsync]]
+ * @see [[#mapAsyncPartitionedUnordered]]
+ */
+ def mapAsyncPartitioned[T, P](parallelism: Int)(
+ partitioner: Out => P)(
+ f: (Out, P) => Future[T]): Repr[T] = {
+ (if (parallelism == 1) {
+ via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
+ } else {
+ via(new MapAsyncPartitioned(parallelism, orderedOutput = true,
partitioner, f))
+ })
+ .withAttributes(DefaultAttributes.mapAsyncPartition and
SourceLocation.forLambda(f))
+ }
+
+ /**
+ * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
+ * partition step before the transform step. The transform function receives
the an individual
+ * stream entry and the calculated partition value for that entry.The max
parallelism of per partition is 1.
+ *
+ * The function `partitioner` is always invoked on the elements in the order
they arrive.
+ * The function `f` is always invoked on the elements which in the same
partition in the order they arrive.
+ *
+ * If the function `partitioner` or `f` throws an exception or if the
[[Future]] is completed
+ * with failure and the supervision decision is
[[pekko.stream.Supervision.Stop]]
+ * the stream will be completed with failure, otherwise the stream continues
and the current element is dropped.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the Future returned by the provided function finishes
and downstream available.
+ *
+ * '''Backpressures when''' the number of futures reaches the configured
parallelism and the downstream
+ * backpressures
+ *
+ * '''Completes when''' upstream completes and all futures have been
completed and all elements have been emitted
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.1.0
+ * @see [[#mapAsyncUnordered]]
+ * @see [[#mapAsyncPartitioned]]
+ */
+ def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
+ partitioner: Out => P)(
+ f: (Out, P) => Future[T]): Repr[T] = {
+ (if (parallelism == 1) {
+ via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
+ } else {
+ via(new MapAsyncPartitioned(parallelism, orderedOutput = false,
partitioner, f))
+ }).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and
SourceLocation.forLambda(f))
+ }
+
/**
* Use the `ask` pattern to send a request-reply message to the target `ref`
actor.
* If any of the asks times out it will fail the stream with a
[[pekko.pattern.AskTimeoutException]].
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
index 1def589f37..8925afc47a 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
@@ -14,7 +14,6 @@
package org.apache.pekko.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance
-import scala.concurrent.Future
import org.apache.pekko
import pekko.NotUsed
import pekko.japi.Pair
@@ -90,36 +89,6 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut,
+Mat](delegate: Flow[(In
def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn,
Out, CtxOut, Mat2] =
new FlowWithContext(delegate.mapMaterializedValue(f))
- /**
- * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
- *
- * @since 1.1.0
- * @see [[#mapAsync]]
- * @see [[#mapAsyncPartitionedUnordered]]
- */
- def mapAsyncPartitioned[T, P](parallelism: Int)(
- extractPartition: Out => P)(
- f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = {
- MapAsyncPartitioned.mapFlowWithContextOrdered(this,
parallelism)(extractPartition)(f)
- }
-
- /**
- * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
- *
- * @since 1.1.0
- * @see [[#mapAsyncUnordered]]
- * @see [[#mapAsyncPartitioned]]
- */
- def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
- extractPartition: Out => P)(
- f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = {
- MapAsyncPartitioned.mapFlowWithContextUnordered(this,
parallelism)(extractPartition)(f)
- }
-
def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate
def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat
>: Mat]
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
index 370f49e520..6c12dd4344 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
@@ -113,6 +113,36 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.parasitic)
})
+ /**
+ * Context-preserving variant of
[[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]].
+ *
+ * @since 1.1.0
+ * @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]]
+ */
+ def mapAsyncPartitioned[Out2, P](parallelism: Int)(
+ partitioner: Out => P)(
+ f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = {
+ via(flow[Out, Ctx].mapAsyncPartitioned(parallelism)(pair =>
partitioner(pair._1)) {
+ (pair, partition) =>
+ f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic)
+ })
+ }
+
+ /**
+ * Context-preserving variant of
[[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]].
+ *
+ * @since 1.1.0
+ * @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]]
+ */
+ def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int)(
+ partitioner: Out => P)(
+ f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = {
+ via(flow[Out, Ctx].mapAsyncPartitionedUnordered(parallelism)(pair =>
partitioner(pair._1)) {
+ (pair, partition) =>
+ f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic)
+ })
+ }
+
/**
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collect]].
*
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 8635a42ad0..429276f36d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -99,34 +99,6 @@ final class Source[+Out, +Mat](
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any =>
Any]), shape)
- /**
- * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
- *
- * @since 1.1.0
- * @see [[#mapAsync]]
- * @see [[#mapAsyncPartitionedUnordered]]
- */
- def mapAsyncPartitioned[T, P](parallelism: Int)(
- extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = {
- MapAsyncPartitioned.mapSourceOrdered(this,
parallelism)(extractPartition)(f)
- }
-
- /**
- * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
- *
- * @since 1.1.0
- * @see [[#mapAsyncUnordered]]
- * @see [[#mapAsyncPartitioned]]
- */
- def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
- extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = {
- MapAsyncPartitioned.mapSourceUnordered(this,
parallelism)(extractPartition)(f)
- }
-
/**
* Materializes this Source, immediately returning (1) its materialized
value, and (2) a new Source
* that can be used to consume elements from the newly materialized Source.
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
index 47d2c14bed..5ce8bcd752 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
@@ -14,7 +14,6 @@
package org.apache.pekko.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance
-import scala.concurrent.Future
import org.apache.pekko
import pekko.stream._
@@ -78,34 +77,6 @@ final class SourceWithContext[+Out, +Ctx, +Mat]
private[stream] (delegate: Sourc
def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx,
Mat2] =
new SourceWithContext(delegate.mapMaterializedValue(f))
- /**
- * Transforms this stream. Works very similarly to [[#mapAsync]] but with an
additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
- *
- * @since 1.1.0
- * @see [[#mapAsync]]
- * @see [[#mapAsyncPartitionedUnordered]]
- */
- def mapAsyncPartitioned[T, P](parallelism: Int)(
- extractPartition: Out => P)(f: (Out, P) => Future[T]):
SourceWithContext[T, Ctx, Mat] = {
- MapAsyncPartitioned.mapSourceWithContextOrdered(this,
parallelism)(extractPartition)(f)
- }
-
- /**
- * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]]
but with an additional
- * partition step before the transform step. The transform function receives
the an individual
- * stream entry and the calculated partition value for that entry.
- *
- * @since 1.1.0
- * @see [[#mapAsyncUnordered]]
- * @see [[#mapAsyncPartitioned]]
- */
- def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
- extractPartition: Out => P)(f: (Out, P) => Future[T]):
SourceWithContext[T, Ctx, Mat] = {
- MapAsyncPartitioned.mapSourceWithContextUnordered(this,
parallelism)(extractPartition)(f)
- }
-
/**
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a
[[pekko.stream.scaladsl.Sink]],
* concatenating the processing steps of both.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]