This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.3.x-mapOption in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 2e84288ecf5730448aaa4a12f3bfd13c0f8b9d85 Author: He-Pin(kerr) <[email protected]> AuthorDate: Sun Nov 2 16:05:22 2025 +0800 feat: Add mapOption operator (#2414) (cherry picked from commit a08b52132f4fec119a484fb197a36c1e31d3c291) --- .../stream/operators/Source-or-Flow/mapOption.md | 31 ++++++++++++++++++++++ docs/src/main/paradox/stream/operators/index.md | 2 ++ .../scala/docs/stream/operators/MapOption.scala | 29 ++++++++++++++++++++ .../apache/pekko/stream/javadsl/SourceTest.java | 11 ++++++++ .../apache/pekko/stream/scaladsl/FlowSpec.scala | 9 +++++++ .../apache/pekko/stream/scaladsl/SourceSpec.scala | 14 ++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../org/apache/pekko/stream/javadsl/Flow.scala | 22 +++++++++++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 22 +++++++++++++++ .../org/apache/pekko/stream/javadsl/SubFlow.scala | 22 +++++++++++++++ .../apache/pekko/stream/javadsl/SubSource.scala | 22 +++++++++++++++ .../org/apache/pekko/stream/scaladsl/Flow.scala | 20 ++++++++++++++ 12 files changed, 205 insertions(+) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md new file mode 100644 index 0000000000..23a5dd1838 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md @@ -0,0 +1,31 @@ +# mapOption + +Transform each element in the stream by calling a mapping function with it and emits the contained item if present. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.mapOption](Source) { scala="#mapOption[T](f:Out=>scala.Option[T]):FlowOps.this.Repr[T]" java="#mapOption(org.apache.pekko.japi.function.Function)" } +@apidoc[Flow.mapOption](Flow) { scala="#mapOption[T](f:Out=>scala.Option[T]):FlowOps.this.Repr[T]" java="#mapOption(org.apache.pekko.japi.function.Function)" } + +## Description + +Transform each element in the stream by calling a mapping function with it and emits the contained item if present. + +## Examples + +Scala +: @@snip [Flow.scala](/docs/src/test/scala/docs/stream/operators/MapOption.scala) { #imports #mapOption } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the mapping function returns and element present + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 0436088b04..80ae73ef44 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -188,6 +188,7 @@ depending on being backpressured by downstream or not. |Source/Flow|<a name="logwithmarker"></a>@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log elements flowing through the stream as well as completion and erroring.| |Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.| |Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| +|Source/Flow|<a name="mapoption"></a>@ref[mapOption](Source-or-Flow/mapOption.md)|Transform each element in the stream by calling a mapping function with it and emits the contained item if present.| |Source/Flow|<a name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.| |Source/Flow|<a name="materializeintosource"></a>@ref[materializeIntoSource](Source-or-Flow/materializeIntoSource.md)|Materializes this Graph, immediately returning its materialized values into a new Source.| |Source/Flow|<a name="optionalvia"></a>@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.| @@ -565,6 +566,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md) * [mapConcat](Source-or-Flow/mapConcat.md) * [mapError](Source-or-Flow/mapError.md) +* [mapOption](Source-or-Flow/mapOption.md) * [mapWithResource](Source-or-Flow/mapWithResource.md) * [materializeIntoSource](Source-or-Flow/materializeIntoSource.md) * [maybe](Source/maybe.md) diff --git a/docs/src/test/scala/docs/stream/operators/MapOption.scala b/docs/src/test/scala/docs/stream/operators/MapOption.scala new file mode 100644 index 0000000000..8cbbd5ca3f --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/MapOption.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com> + */ + +package docs.stream.operators + +//#imports +import org.apache.pekko +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.scaladsl._ + +//#imports + +object MapOption { + + // #mapOption + val source: Source[Int, NotUsed] = Source(1 to 10) + val mapped: Source[String, NotUsed] = source.mapOption(elem => if (elem % 2 == 0) Some(elem.toString) else None) + // #mapOption +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index affa189036..05fbca4a69 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1887,4 +1887,15 @@ public class SourceTest extends StreamTest { .get(3, TimeUnit.SECONDS); assertEquals(Optional.empty(), empty); } + + @Test + public void mustBeAbleToMapOption() throws Exception { + final List<Integer> values = + Source.from(Arrays.asList(1, 2, 3, 4, 5)) + .mapOption(i -> i % 2 == 0 ? Optional.of(i * 10) : Optional.empty()) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + assertEquals(Arrays.asList(20, 40), values); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala index 0364f130cb..699e66a8c6 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala @@ -645,6 +645,15 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r source.runWith(Sink.head).futureValue should ===(List(2, 4, 6)) } + + "mapOption" in { + val flow = Flow[Int].mapOption { + case x if x % 2 == 0 => Some(x * 2) + case _ => None + } + val result = Source(1 to 5).via(flow).runWith(Sink.seq).futureValue + result should ===(Seq(4, 8)) + } } object TestException extends RuntimeException with NoStackTrace diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index f81bc8f3e5..5c3a8f88ee 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -578,4 +578,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout { .expectComplete() } } + + "Source mapOption" must { + "map and filter elements" in { + Source(1 to 5) + .mapOption { n => + if (n % 2 == 0) Some(n * 10) + else None + } + .runWith(TestSink[Int]()) + .request(5) + .expectNext(20, 40) + .expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 5e0a97b1f7..2d4bd30c98 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -30,6 +30,7 @@ import pekko.stream.Attributes._ // stage specific default attributes val map = name("map") + val mapOption = name("mapOption") val contramap = name("contramap") val dimap = name("dimap") val log = name("log") diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 49474b642d..24e6359300 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -37,6 +37,7 @@ import pekko.japi.Pair import pekko.japi.function import pekko.japi.function.Creator import pekko.stream.{ javadsl, _ } +import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } import pekko.util.ConstantFun import pekko.util.FutureConverters._ @@ -748,6 +749,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.map(f.apply)) + /** + * Transform each input element into an `Optional` of output element. + * If the mapping function returns `Optional.empty()`, the element is filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns `Optional` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.map(f(_)).collect { + case e if e.isPresent => e.get() + }.addAttributes(DefaultAttributes.mapOption)) + /** * This is a simplified version of `wireTap(Sink)` that takes only a simple procedure. * Elements will be passed into this "side channel" function, and any of its results will be ignored. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index c860816154..c97b5ad8f0 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -35,6 +35,7 @@ import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } +import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat, ZipWithIndexJava } import pekko.util.{ unused, _ } import pekko.util.FutureConverters._ @@ -2277,6 +2278,27 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] = new Source(delegate.map(f.apply)) + /** + * Transform each input element into an `Optional` of output element. + * If the mapping function returns `Optional.empty()`, the element is filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns `Optional` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Source[T, Mat] = + new Source(delegate.map(f(_)).collect { + case e if e.isPresent => e.get() + }.addAttributes(DefaultAttributes.mapOption)) + /** * This is a simplified version of `wireTap(Sink)` that takes only a simple procedure. * Elements will be passed into this "side channel" function, and any of its results will be ignored. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index c5aa721850..9127e9323f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -29,6 +29,7 @@ import pekko.NotUsed import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } import pekko.util.ConstantFun import pekko.util.FutureConverters._ @@ -157,6 +158,27 @@ class SubFlow[In, Out, Mat]( def map[T](f: function.Function[Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.map(f.apply)) + /** + * Transform each input element into an `Optional` of output element. + * If the mapping function returns `Optional.empty()`, the element is filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns `Optional` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def mapOption[T](f: function.Function[Out, Optional[T]]): SubFlow[In, T, Mat] = + new SubFlow(delegate.map(f(_)).collect { + case e if e.isPresent => e.get() + }.addAttributes(DefaultAttributes.mapOption)) + /** * This is a simplified version of `wireTap(Sink)` that takes only a simple procedure. * Elements will be passed into this "side channel" function, and any of its results will be ignored. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 66eb0b2b04..0f798dc2dd 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -29,6 +29,7 @@ import pekko.NotUsed import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } import pekko.util.ConstantFun import pekko.util.FutureConverters._ @@ -148,6 +149,27 @@ class SubSource[Out, Mat]( def map[T](f: function.Function[Out, T]): SubSource[T, Mat] = new SubSource(delegate.map(f.apply)) + /** + * Transform each input element into an `Optional` of output element. + * If the mapping function returns `Optional.empty()`, the element is filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns `Optional` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def mapOption[T](f: function.Function[Out, Optional[T]]): SubSource[T, Mat] = + new SubSource(delegate.map(f(_)).collect { + case e if e.isPresent => e.get() + }.addAttributes(DefaultAttributes.mapOption)) + /** * This is a simplified version of `wireTap(Sink)` that takes only a simple procedure. * Elements will be passed into this "side channel" function, and any of its results will be ignored. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 463ecaf411..e2fdbbf031 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1156,6 +1156,26 @@ trait FlowOps[+Out, +Mat] { */ def map[T](f: Out => T): Repr[T] = via(Map(f)) + /** + * Transform each input element into an `Option` of output element. + * If the function returns `Some(value)`, that value is emitted downstream. + * If the function returns `None`, no element is emitted downstream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns `Some(value)` + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def mapOption[T](f: Out => Option[T]): Repr[T] = + map(f).collect { case Some(value) => value }.addAttributes(DefaultAttributes.mapOption) + /** * This is a simplified version of `wireTap(Sink)` that takes only a simple function. * Elements will be passed into this "side channel" function, and any of its results will be ignored. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
