This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 5f2fd5d564 feat: Add Sink#source (#2250) (#2334)
5f2fd5d564 is described below
commit 5f2fd5d56445e0234a8c6f1e8283b92b84a7fb2d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Oct 20 01:02:38 2025 +0800
feat: Add Sink#source (#2250) (#2334)
* feat: Add Sink#source (#2250)
* feat: Add Sink#source
(cherry picked from commit a579679445e9d47eafff7cfde62a0c2bc7268ee2)
* Update @since annotation for source method
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.../main/paradox/stream/operators/Sink/source.md | 29 +++++
docs/src/main/paradox/stream/operators/index.md | 2 +
.../org/apache/pekko/stream/javadsl/SinkTest.java | 11 ++
.../pekko/stream/scaladsl/SourceSinkSpec.scala | 87 +++++++++++++++
.../org/apache/pekko/stream/impl/Stages.scala | 1 +
.../pekko/stream/impl/fusing/SourceSink.scala | 121 +++++++++++++++++++++
.../org/apache/pekko/stream/javadsl/Sink.scala | 14 +++
.../org/apache/pekko/stream/scaladsl/Sink.scala | 17 ++-
8 files changed, 281 insertions(+), 1 deletion(-)
diff --git a/docs/src/main/paradox/stream/operators/Sink/source.md
b/docs/src/main/paradox/stream/operators/Sink/source.md
new file mode 100644
index 0000000000..6dc6f751e9
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/source.md
@@ -0,0 +1,29 @@
+# Sink.source
+
+A `Sink` that materializes this `Sink` itself as a `Source`, the returning
`Source` can only have one subscriber.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.source](Sink$) { java="#source()" }
+@apidoc[Sink.source](Sink$) { scala="#source()" }
+
+
+## Description
+
+A `Sink` that materialize this `Sink` itself as a `Source`, the returning
`Source` can only have one subscriber.
+
+Use `BroadcastHub.sink` if you need a `Source` that allows multiple
subscribers.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** When the materialized `Source` is cancelled or timeout with
subscription.
+
+**backpressures** When the materialized `Source` backpressures or not ready to
receive elements.
+
+@@@
+
+
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index d58bd65f91..62169a4d7b 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -90,6 +90,7 @@ These built-in sinks are available from
@scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction
function on the incoming elements and pass the result to the next invocation.|
|Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the
stream into a collection.|
|Sink|<a name="setup"></a>@ref[setup](Sink/setup.md)|Defer the creation of a
`Sink` until materialization and access `ActorMaterializer` and `Attributes`|
+|Sink|<a name="source"></a>@ref[source](Sink/source.md)|A `Sink` that
materializes this `Sink` itself as a `Source`, the returning `Source` can only
have one subscriber.|
|Sink|<a name="takelast"></a>@ref[takeLast](Sink/takeLast.md)|Collect the last
`n` values emitted from the stream into a collection.|
## Additional Sink and Source converters
@@ -602,6 +603,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [single](Source/single.md)
* [sink](PubSub/sink.md)
* [sliding](Source-or-Flow/sliding.md)
+* [source](Sink/source.md)
* [source](PubSub/source.md)
* [splitAfter](Source-or-Flow/splitAfter.md)
* [splitWhen](Source-or-Flow/splitWhen.md)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
index 1495ea5e70..5f934ed55c 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
@@ -270,4 +270,15 @@ public class SinkTest extends StreamTest {
CompletionStage<Long> cs = Source.range(1, 10).runWith(Sink.count(),
system);
Assert.assertEquals(10, cs.toCompletableFuture().join().longValue());
}
+
+ @Test
+ public void mustBeAbleToUseSinkAsSource() throws Exception {
+ final List<Integer> r =
+ Source.range(1, 10)
+ .runWith(Sink.source(), system)
+ .runWith(Sink.seq(), system)
+ .toCompletableFuture()
+ .get(1, TimeUnit.SECONDS);
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), r);
+ }
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala
new file mode 100644
index 0000000000..73f728bbdb
--- /dev/null
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.stream.{ Attributes, StreamSubscriptionTimeoutTerminationMode }
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
+
+class SourceSinkSpec extends StreamSpec("""
+ pekko.stream.materializer.initial-input-buffer-size = 2
+ """) {
+
+ "Sink.toSeq" must {
+ "Can be used as a Source with run twice" in {
+ val s = Source(1 to 6).runWith(Sink.source)
+ s.runWith(Sink.seq).futureValue should be(1 to 6)
+ }
+
+ "Can complete when upstream completes without elements" in {
+ val s = Source.empty.runWith(Sink.source)
+ s.runWith(Sink.seq).futureValue should be(Nil)
+ }
+
+ "Can cancel when down stream cancel" in {
+ val (pub, source) = TestSource.probe[Int]
+ .toMat(Sink.source)(Keep.both)
+ .run()
+ val sub = source.runWith(TestSink.probe[Int])
+ pub.ensureSubscription()
+ sub.ensureSubscription()
+ sub.cancel()
+ pub.expectCancellation()
+ }
+
+ "Can timeout when no subscription" in {
+ import scala.concurrent.duration._
+ val (pub, source) = TestSource.probe[Int]
+ .toMat(Sink.source)(Keep.both)
+ .addAttributes(Attributes(
+ StreamSubscriptionTimeout(
+ 2.seconds,
+ StreamSubscriptionTimeoutTerminationMode.cancel
+ )
+ ))
+ .run()
+ pub.expectCancellation()
+ Thread.sleep(1000) // wait a bit
+ val sub = source.runWith(TestSink.probe)
+ sub.expectSubscription()
+ sub.expectError()
+ }
+
+ "Can backpressure" in {
+ Source.iterate(1)(_ => true, _ + 1)
+ .runWith(Sink.source).runWith(TestSink.probe[Int])
+ .request(3)
+ .expectNext(1, 2, 3)
+ .request(2)
+ .expectNext(4, 5)
+ .cancel()
+ }
+
+ "Can use with mapMaterializedValue" in {
+ val sink = Sink.source[Int].mapMaterializedValue(_.runWith(Sink.seq))
+ Source(1 to 5)
+ .runWith(sink)
+ .futureValue should be(1 to 5)
+ }
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 9c7c1d56e1..58ea8b5bd5 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -156,6 +156,7 @@ import pekko.stream.Attributes._
val seqSink = name("seqSink")
val countSink = name("countSink")
val publisherSink = name("publisherSink")
+ val sourceSink = name("sourceSink")
val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink")
val neverSink = name("neverSink")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala
new file mode 100644
index 0000000000..db58cc93e6
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import org.apache.pekko.stream.impl.Stages.DefaultAttributes
+import pekko.NotUsed
+import pekko.annotation.InternalApi
+import pekko.stream.{ ActorAttributes, Attributes, Inlet, SinkShape,
StreamSubscriptionTimeoutTerminationMode }
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.scaladsl.Source
+import pekko.stream.stage.{
+ GraphStageLogic,
+ GraphStageWithMaterializedValue,
+ InHandler,
+ OutHandler,
+ TimerGraphStageLogic
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object SourceSink
+ extends GraphStageWithMaterializedValue[SinkShape[Any], Source[Any,
NotUsed]] {
+ private val SubscriptionTimerKey = "SubstreamSubscriptionTimerKey"
+ private val in = Inlet[Any]("sourceSink.in")
+ override val shape = SinkShape(in)
+
+ override def toString: String = "SourceSink"
+ override protected def initialAttributes: Attributes =
DefaultAttributes.sourceSink
+
+ override def createLogicAndMaterializedValue(
+ inheritedAttributes: Attributes): (GraphStageLogic, Source[Any,
NotUsed]) = {
+
+ /**
+ * NOTE: in the current implementation of Pekko Stream,
+ * We have to materialization twice to do the piping, which means, even we
can treat the Sink as a Source.
+ *
+ * In an idea word this stage should be purged out by the materializer
optimization,
+ * and we can directly connect the upstream to the downstream.
+ */
+ object logic extends TimerGraphStageLogic(shape) with InHandler with
OutHandler { self =>
+ val sinkSource = new SubSourceOutlet[Any]("sinkSource")
+
+ private def subHandler(): OutHandler = new OutHandler {
+ override def onPull(): Unit = {
+ setKeepGoing(false)
+ cancelTimer(SubscriptionTimerKey)
+ pull(in)
+ sinkSource.setHandler(self)
+ }
+ override def onDownstreamFinish(cause: Throwable): Unit =
self.onDownstreamFinish(cause)
+ }
+
+ override def preStart(): Unit = {
+ sinkSource.setHandler(subHandler())
+ setKeepGoing(true)
+ val timeout =
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
+ scheduleOnce(SubscriptionTimerKey, timeout)
+ }
+
+ override protected def onTimer(timerKey: Any): Unit = {
+ val materializer = interpreter.materializer
+ val StreamSubscriptionTimeout(timeout, mode) =
+
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout]
+
+ mode match {
+ case StreamSubscriptionTimeoutTerminationMode.CancelTermination =>
+ sinkSource.timeout(timeout)
+ if (sinkSource.isClosed)
+ completeStage()
+ case StreamSubscriptionTimeoutTerminationMode.NoopTermination =>
+ // do nothing
+ case StreamSubscriptionTimeoutTerminationMode.WarnTermination =>
+ materializer.logger.warning(
+ "Substream subscription timeout triggered after {} in
SourceSink.",
+ timeout)
+ }
+ }
+
+ override def onPush(): Unit = sinkSource.push(grab(in))
+ override def onPull(): Unit = pull(in)
+
+ override def onUpstreamFinish(): Unit = {
+ if (!sinkSource.isClosed) {
+ sinkSource.complete()
+ }
+ completeStage()
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = if
(!sinkSource.isClosed) {
+ sinkSource.fail(ex)
+ completeStage()
+ } else failStage(ex)
+
+ override def onDownstreamFinish(cause: Throwable): Unit = {
+ // cancel upstream only if the substream was cancelled
+ if (!isClosed(in)) cancelStage(cause)
+ }
+
+ setHandler(in, this)
+ }
+
+ (logic, Source.fromGraph(logic.sinkSource.source))
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index de3a6bdd63..ead3f1a28f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -214,6 +214,20 @@ object Sink {
def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
+ /**
+ * A `Sink` that materializes this `Sink` itself as a `Source`.
+ * The returned `Source` is a "live view" onto the `Sink` and only supports
a single `Subscriber`.
+ *
+ * Use [[BroadcastHub#sink]] if you need a `Source` that allows multiple
subscribers.
+ *
+ * Note: even if the `Source` is directly connected to the `Sink`, there is
still an asynchronous boundary
+ * between them; performance may be improved in the future.
+ *
+ * @since 1.3.0
+ */
+ def source[T](): Sink[T, Source[T, NotUsed]] = new
Sink(scaladsl.Sink.source[T])
+ .mapMaterializedValue(src => src.asJava)
+
/**
* A `Sink` that will invoke the given procedure for each received element.
The sink is materialized
* into a [[java.util.concurrent.CompletionStage]] which will be completed
with `Success` when reaching the
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 5e5197058b..4863dc52a4 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -27,7 +27,7 @@ import pekko.dispatch.ExecutionContexts
import pekko.stream._
import pekko.stream.impl._
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ CountSink, GraphStages }
+import pekko.stream.impl.fusing.{ CountSink, GraphStages, SourceSink }
import pekko.stream.stage._
import pekko.util.ccompat._
@@ -323,6 +323,21 @@ object Sink {
if (fanout) new
FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink,
shape("FanoutPublisherSink"))
else new PublisherSink[T](DefaultAttributes.publisherSink,
shape("PublisherSink")))
+ /**
+ * A `Sink` that materializes this `Sink` itself as a `Source`.
+ * The returned `Source` is a "live view" onto the `Sink` and only supports
a single `Subscriber`.
+ *
+ * Use [[BroadcastHub#sink]] if you need a `Source` that allows multiple
subscribers.
+ *
+ * Note: even if the `Source` is directly connected to the `Sink`, there is
still an asynchronous boundary
+ * between them; performance may be improved in the future.
+ *
+ * @since 1.3.0
+ */
+ def source[T]: Sink[T, Source[T, NotUsed]] =
_sourceSink.asInstanceOf[Sink[T, Source[T, NotUsed]]]
+
+ private[this] val _sourceSink = fromGraph[Any, Source[Any,
NotUsed]](SourceSink)
+
/**
* A `Sink` that will consume the stream and discard the elements.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]