This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new b13f2b193b feat: Add fromOption operator (#2413) (#2415)
b13f2b193b is described below

commit b13f2b193bb9b7cb4990f397d3d2206f55e634a2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Nov 2 16:05:04 2025 +0800

    feat: Add fromOption operator (#2413) (#2415)
    
    (cherry picked from commit c37cbc61920e3060dfbbfa797225b753893b65cb)
---
 .../paradox/stream/operators/Source/fromOption.md  | 26 ++++++++++++++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  6 ++++-
 .../apache/pekko/stream/javadsl/SourceTest.java    | 17 ++++++++++++++
 .../pekko/stream/DslFactoriesConsistencySpec.scala |  1 +
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 17 ++++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   |  8 +++++++
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 10 +++++++++
 7 files changed, 84 insertions(+), 1 deletion(-)

diff --git a/docs/src/main/paradox/stream/operators/Source/fromOption.md 
b/docs/src/main/paradox/stream/operators/Source/fromOption.md
new file mode 100644
index 0000000000..770fb18cc0
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source/fromOption.md
@@ -0,0 +1,26 @@
+# Source.fromOption
+
+Create a `Source` from an @scala[`Option[T]`] @java[`Optional<T>`] value, 
emitting the value if it is present.
+
+@ref[Source operators](../index.md#source-operators)
+
+
+## Signature
+
+@apidoc[Source.fromOption](Source$) { }
+
+
+## Description
+
+Create a `Source` from an @scala[`Option[T]`] @java[`Optional<T>`] value, 
emitting the value if it is present.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the value is present
+
+**completes** afterwards
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 73db178be4..0436088b04 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -22,6 +22,7 @@ These built-in sources are available from 
@scala[`org.apache.pekko.stream.scalad
 |Source|<a 
name="fromfuturesource"></a>@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated
 by @ref[`Source.futureSource`](Source/futureSource.md).|
 |Source|<a 
name="fromiterator"></a>@ref[fromIterator](Source/fromIterator.md)|Stream the 
values from an `Iterator`, requesting the next value when there is demand.|
 |Source|<a 
name="fromjavastream"></a>@ref[fromJavaStream](Source/fromJavaStream.md)|Stream 
the values from a Java 8 `Stream`, requesting the next value when there is 
demand.|
+|Source|<a name="fromoption"></a>@ref[fromOption](Source/fromOption.md)|Create 
a `Source` from an @scala[`Option[T]`] @java[`Optional<T>`] value, emitting the 
value if it is present.|
 |Source|<a 
name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration
 with Reactive Streams, subscribes to a 
@javadoc[Publisher](java.util.concurrent.Flow.Publisher).|
 |Source|<a 
name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated
 by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
 |Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single 
value of the `Future` when it completes and there is demand.|
@@ -377,6 +378,7 @@ Flow operators to (de)compress.
 |Compression|<a 
name="deflate"></a>@ref[deflate](Compression/deflate.md)|Creates a flow that 
deflate-compresses a stream of ByteStrings. |
 |Compression|<a name="gunzip"></a>@ref[gunzip](Compression/gunzip.md)|Creates 
a flow that gzip-decompresses a stream of ByteStrings.  |
 |Compression|<a name="gzip"></a>@ref[gzip](Compression/gzip.md)|Creates a flow 
that gzip-compresses a stream of ByteStrings.  |
+|Compression|<a 
name="gzipdecompress"></a>@ref[gzipDecompress](Compression/gzipDecompress.md)|Creates
 a flow that gzip-decompresses a stream of ByteStrings.  |
 |Compression|<a 
name="inflate"></a>@ref[inflate](Compression/inflate.md)|Creates a flow that 
deflate-decompresses a stream of ByteStrings. |
 
 ## Error handling
@@ -498,6 +500,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [fromJavaStream](StreamConverters/fromJavaStream.md)
 * [fromMaterializer](Source-or-Flow/fromMaterializer.md)
 * [fromMaterializer](Sink/fromMaterializer.md)
+* [fromOption](Source/fromOption.md)
 * [fromOutputStream](StreamConverters/fromOutputStream.md)
 * [fromPath](FileIO/fromPath.md)
 * [fromPublisher](Source/fromPublisher.md)
@@ -516,8 +519,9 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [groupedWeighted](Source-or-Flow/groupedWeighted.md)
 * [groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md)
 * [groupedWithin](Source-or-Flow/groupedWithin.md)
-* [gzipDecompress](Compression/gzipDecompress.md)
+* [gunzip](Compression/gunzip.md)
 * [gzip](Compression/gzip.md)
+* [gzipDecompress](Compression/gzipDecompress.md)
 * [head](Sink/head.md)
 * [headOption](Sink/headOption.md)
 * [idleTimeout](Source-or-Flow/idleTimeout.md)
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 081f657820..affa189036 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1870,4 +1870,21 @@ public class SourceTest extends StreamTest {
         .expectNext(0)
         .expectComplete();
   }
+
+  @Test
+  public void mustBeAbleToCreateFromOption() throws Exception {
+    final Integer value =
+        Source.fromOption(Optional.of(42))
+            .runWith(Sink.head(), system)
+            .toCompletableFuture()
+            .get(3, TimeUnit.SECONDS);
+    assertEquals((Integer) 42, value);
+    //
+    final Optional<Integer> empty =
+        Source.fromOption(Optional.<Integer>empty())
+            .runWith(Sink.headOption(), system)
+            .toCompletableFuture()
+            .get(3, TimeUnit.SECONDS);
+    assertEquals(Optional.empty(), empty);
+  }
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 0d3de6261f..93defd9017 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -64,6 +64,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with 
Matchers {
       (classOf[scala.collection.Seq[_]],                   
classOf[java.util.List[_]]) ::
       (classOf[scala.collection.immutable.Seq[_]],         
classOf[java.util.List[_]]) ::
       (classOf[scala.collection.immutable.Set[_]],         
classOf[java.util.Set[_]]) ::
+      (classOf[scala.Option[_]],                           
classOf[java.util.Optional[_]]) ::
       (classOf[Boolean],                                   
classOf[pekko.stream.javadsl.AsPublisher]) ::
       (classOf[scala.Function0[_]],                        
classOf[pekko.japi.function.Creator[_]]) ::
       (classOf[scala.Function0[_]],                        
classOf[java.util.concurrent.Callable[_]]) ::
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index cb37d951e6..f81bc8f3e5 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -561,4 +561,21 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         .expectComplete()
     }
   }
+
+  "Source from option" must {
+    "produce one element when Some" in {
+      Source.fromOption(Some(42))
+        .runWith(TestSink[Int]())
+        .request(1)
+        .expectNext(42)
+        .expectComplete()
+    }
+
+    "complete immediately when None" in {
+      Source.fromOption(None)
+        .runWith(TestSink[Int]())
+        .request(1)
+        .expectComplete()
+    }
+  }
 }
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index d2484f89a0..c860816154 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -203,6 +203,14 @@ object Source {
   def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new 
Source(scaladsl.Source.fromGraph(
     new ArraySource[T](array)))
 
+  /**
+   * Create a `Source` from an `Optional` value, emitting the value if it is 
present.
+   *
+   * @since 1.3.0
+   */
+  def fromOption[T](optional: Optional[T]): Source[T, NotUsed] =
+    if (optional.isPresent) single(optional.get()) else empty()
+
   /**
    * Creates [[Source]] that represents integer values in range 
''[start;end]'', step equals to 1.
    * It allows to create `Source` out of range as simply as on Scala `Source(1 
to N)`
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index e7f821cb87..306dde3ec3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -480,6 +480,16 @@ object Source {
   def single[T](element: T): Source[T, NotUsed] =
     fromGraph(new GraphStages.SingleSource(element))
 
+  /**
+   * Create a `Source` from an `Option` value, emitting the value if it is 
defined.
+   *
+   * @since 1.3.0
+   */
+  def fromOption[T](option: Option[T]): Source[T, NotUsed] = option match {
+    case Some(value) => single(value)
+    case None        => empty
+  }
+
   /**
    * Create a `Source` that will continually emit the given element.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to