This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.3.x-mapOption
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 2e84288ecf5730448aaa4a12f3bfd13c0f8b9d85
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Nov 2 16:05:22 2025 +0800

    feat: Add mapOption operator (#2414)
    
    (cherry picked from commit a08b52132f4fec119a484fb197a36c1e31d3c291)
---
 .../stream/operators/Source-or-Flow/mapOption.md   | 31 ++++++++++++++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 ++
 .../scala/docs/stream/operators/MapOption.scala    | 29 ++++++++++++++++++++
 .../apache/pekko/stream/javadsl/SourceTest.java    | 11 ++++++++
 .../apache/pekko/stream/scaladsl/FlowSpec.scala    |  9 +++++++
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 14 ++++++++++
 .../org/apache/pekko/stream/impl/Stages.scala      |  1 +
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 22 +++++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 22 +++++++++++++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 22 +++++++++++++++
 .../apache/pekko/stream/javadsl/SubSource.scala    | 22 +++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    | 20 ++++++++++++++
 12 files changed, 205 insertions(+)

diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md
new file mode 100644
index 0000000000..23a5dd1838
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md
@@ -0,0 +1,31 @@
+# mapOption
+
+Transform each element in the stream by calling a mapping function with it and 
emits the contained item if present.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.mapOption](Source) { 
scala="#mapOption[T](f:Out=&gt;scala.Option[T]):FlowOps.this.Repr[T]" 
java="#mapOption(org.apache.pekko.japi.function.Function)" }
+@apidoc[Flow.mapOption](Flow) { 
scala="#mapOption[T](f:Out=&gt;scala.Option[T]):FlowOps.this.Repr[T]" 
java="#mapOption(org.apache.pekko.japi.function.Function)" }
+
+## Description
+
+Transform each element in the stream by calling a mapping function with it and 
emits the contained item if present.
+
+## Examples
+
+Scala
+:  @@snip 
[Flow.scala](/docs/src/test/scala/docs/stream/operators/MapOption.scala) { 
#imports #mapOption }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the mapping function returns and element present
+
+**backpressures** when downstream backpressures
+
+**completes** when upstream completes
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 0436088b04..80ae73ef44 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -188,6 +188,7 @@ depending on being backpressured by downstream or not.
 |Source/Flow|<a 
name="logwithmarker"></a>@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log
 elements flowing through the stream as well as completion and erroring.|
 |Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each 
element in the stream by calling a mapping function with it and passing the 
returned value downstream.|
 |Source/Flow|<a 
name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform 
each element into zero or more elements that are individually passed 
downstream.|
+|Source/Flow|<a 
name="mapoption"></a>@ref[mapOption](Source-or-Flow/mapOption.md)|Transform 
each element in the stream by calling a mapping function with it and emits the 
contained item if present.|
 |Source/Flow|<a 
name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map
 elements with the help of a resource that can be opened, transform each 
element (in a blocking way) and closed.|
 |Source/Flow|<a 
name="materializeintosource"></a>@ref[materializeIntoSource](Source-or-Flow/materializeIntoSource.md)|Materializes
 this Graph, immediately returning its materialized values into a new Source.|
 |Source/Flow|<a 
name="optionalvia"></a>@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a 
stream containing optional elements, transforms each element by applying the 
given `viaFlow` and passing the value downstream as an optional value.|
@@ -565,6 +566,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)
 * [mapConcat](Source-or-Flow/mapConcat.md)
 * [mapError](Source-or-Flow/mapError.md)
+* [mapOption](Source-or-Flow/mapOption.md)
 * [mapWithResource](Source-or-Flow/mapWithResource.md)
 * [materializeIntoSource](Source-or-Flow/materializeIntoSource.md)
 * [maybe](Source/maybe.md)
diff --git a/docs/src/test/scala/docs/stream/operators/MapOption.scala 
b/docs/src/test/scala/docs/stream/operators/MapOption.scala
new file mode 100644
index 0000000000..8cbbd5ca3f
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/MapOption.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.stream.operators
+
+//#imports
+import org.apache.pekko
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.scaladsl._
+
+//#imports
+
+object MapOption {
+
+  // #mapOption
+  val source: Source[Int, NotUsed] = Source(1 to 10)
+  val mapped: Source[String, NotUsed] = source.mapOption(elem => if (elem % 2 
== 0) Some(elem.toString) else None)
+  // #mapOption
+}
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index affa189036..05fbca4a69 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1887,4 +1887,15 @@ public class SourceTest extends StreamTest {
             .get(3, TimeUnit.SECONDS);
     assertEquals(Optional.empty(), empty);
   }
+
+  @Test
+  public void mustBeAbleToMapOption() throws Exception {
+    final List<Integer> values =
+        Source.from(Arrays.asList(1, 2, 3, 4, 5))
+            .mapOption(i -> i % 2 == 0 ? Optional.of(i * 10) : 
Optional.empty())
+            .runWith(Sink.seq(), system)
+            .toCompletableFuture()
+            .get(3, TimeUnit.SECONDS);
+    assertEquals(Arrays.asList(20, 40), values);
+  }
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
index 0364f130cb..699e66a8c6 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
@@ -645,6 +645,15 @@ class FlowSpec extends 
StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
 
       source.runWith(Sink.head).futureValue should ===(List(2, 4, 6))
     }
+
+    "mapOption" in {
+      val flow = Flow[Int].mapOption {
+        case x if x % 2 == 0 => Some(x * 2)
+        case _               => None
+      }
+      val result = Source(1 to 5).via(flow).runWith(Sink.seq).futureValue
+      result should ===(Seq(4, 8))
+    }
   }
 
   object TestException extends RuntimeException with NoStackTrace
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index f81bc8f3e5..5c3a8f88ee 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -578,4 +578,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         .expectComplete()
     }
   }
+
+  "Source mapOption" must {
+    "map and filter elements" in {
+      Source(1 to 5)
+        .mapOption { n =>
+          if (n % 2 == 0) Some(n * 10)
+          else None
+        }
+        .runWith(TestSink[Int]())
+        .request(5)
+        .expectNext(20, 40)
+        .expectComplete()
+    }
+  }
 }
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 5e0a97b1f7..2d4bd30c98 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -30,6 +30,7 @@ import pekko.stream.Attributes._
 
     // stage specific default attributes
     val map = name("map")
+    val mapOption = name("mapOption")
     val contramap = name("contramap")
     val dimap = name("dimap")
     val log = name("log")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 49474b642d..24e6359300 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -37,6 +37,7 @@ import pekko.japi.Pair
 import pekko.japi.function
 import pekko.japi.function.Creator
 import pekko.stream.{ javadsl, _ }
+import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
 import pekko.util.ConstantFun
 import pekko.util.FutureConverters._
@@ -748,6 +749,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, 
Out, Mat]) extends Gr
   def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] =
     new Flow(delegate.map(f.apply))
 
+  /**
+   * Transform each input element into an `Optional` of output element.
+   * If the mapping function returns `Optional.empty()`, the element is 
filtered out.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the mapping function returns `Optional`
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Flow[In, 
T, Mat] =
+    new Flow(delegate.map(f(_)).collect {
+      case e if e.isPresent => e.get()
+    }.addAttributes(DefaultAttributes.mapOption))
+
   /**
    * This is a simplified version of `wireTap(Sink)` that takes only a simple 
procedure.
    * Elements will be passed into this "side channel" function, and any of its 
results will be ignored.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index c860816154..c97b5ad8f0 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -35,6 +35,7 @@ import pekko.japi.{ function, JavaPartialFunction, Pair }
 import pekko.japi.function.Creator
 import pekko.stream._
 import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava 
}
+import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat, 
ZipWithIndexJava }
 import pekko.util.{ unused, _ }
 import pekko.util.FutureConverters._
@@ -2277,6 +2278,27 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
   def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] =
     new Source(delegate.map(f.apply))
 
+  /**
+   * Transform each input element into an `Optional` of output element.
+   * If the mapping function returns `Optional.empty()`, the element is 
filtered out.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the mapping function returns `Optional`
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Source[T, 
Mat] =
+    new Source(delegate.map(f(_)).collect {
+      case e if e.isPresent => e.get()
+    }.addAttributes(DefaultAttributes.mapOption))
+
   /**
    * This is a simplified version of `wireTap(Sink)` that takes only a simple 
procedure.
    * Elements will be passed into this "side channel" function, and any of its 
results will be ignored.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index c5aa721850..9127e9323f 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -29,6 +29,7 @@ import pekko.NotUsed
 import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
 import pekko.japi.{ function, Pair }
 import pekko.stream._
+import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
 import pekko.util.ConstantFun
 import pekko.util.FutureConverters._
@@ -157,6 +158,27 @@ class SubFlow[In, Out, Mat](
   def map[T](f: function.Function[Out, T]): SubFlow[In, T, Mat] =
     new SubFlow(delegate.map(f.apply))
 
+  /**
+   * Transform each input element into an `Optional` of output element.
+   * If the mapping function returns `Optional.empty()`, the element is 
filtered out.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the mapping function returns `Optional`
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def mapOption[T](f: function.Function[Out, Optional[T]]): SubFlow[In, T, 
Mat] =
+    new SubFlow(delegate.map(f(_)).collect {
+      case e if e.isPresent => e.get()
+    }.addAttributes(DefaultAttributes.mapOption))
+
   /**
    * This is a simplified version of `wireTap(Sink)` that takes only a simple 
procedure.
    * Elements will be passed into this "side channel" function, and any of its 
results will be ignored.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 66eb0b2b04..0f798dc2dd 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -29,6 +29,7 @@ import pekko.NotUsed
 import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
 import pekko.japi.{ function, Pair }
 import pekko.stream._
+import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
 import pekko.util.ConstantFun
 import pekko.util.FutureConverters._
@@ -148,6 +149,27 @@ class SubSource[Out, Mat](
   def map[T](f: function.Function[Out, T]): SubSource[T, Mat] =
     new SubSource(delegate.map(f.apply))
 
+  /**
+   * Transform each input element into an `Optional` of output element.
+   * If the mapping function returns `Optional.empty()`, the element is 
filtered out.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the mapping function returns `Optional`
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def mapOption[T](f: function.Function[Out, Optional[T]]): SubSource[T, Mat] =
+    new SubSource(delegate.map(f(_)).collect {
+      case e if e.isPresent => e.get()
+    }.addAttributes(DefaultAttributes.mapOption))
+
   /**
    * This is a simplified version of `wireTap(Sink)` that takes only a simple 
procedure.
    * Elements will be passed into this "side channel" function, and any of its 
results will be ignored.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 463ecaf411..e2fdbbf031 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1156,6 +1156,26 @@ trait FlowOps[+Out, +Mat] {
    */
   def map[T](f: Out => T): Repr[T] = via(Map(f))
 
+  /**
+   * Transform each input element into an `Option` of output element.
+   * If the function returns `Some(value)`, that value is emitted downstream.
+   * If the function returns `None`, no element is emitted downstream.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the mapping function returns `Some(value)`
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def mapOption[T](f: Out => Option[T]): Repr[T] =
+    map(f).collect { case Some(value) => value 
}.addAttributes(DefaultAttributes.mapOption)
+
   /**
    * This is a simplified version of `wireTap(Sink)` that takes only a simple 
function.
    * Elements will be passed into this "side channel" function, and any of its 
results will be ignored.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to