This is an automated email from the ASF dual-hosted git repository.
gregm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 66d7dc1946 Add Sink.eagerFutureSink to avoid
NeverMaterializedException on empty streams (#2684)
66d7dc1946 is described below
commit 66d7dc19464c1e082e759f0595cd8976249b7b00
Author: Greg Methvin <[email protected]>
AuthorDate: Tue Mar 10 12:03:47 2026 -0700
Add Sink.eagerFutureSink to avoid NeverMaterializedException on empty
streams (#2684)
---
.../stream/operators/Sink/completionStageSink.md | 8 +
.../operators/Sink/eagerCompletionStageSink.md | 33 ++++
.../stream/operators/Sink/eagerFutureSink.md | 37 ++++
.../paradox/stream/operators/Sink/futureSink.md | 6 +
.../operators/Sink/lazyCompletionStageSink.md | 5 +-
.../stream/operators/Sink/lazyFutureSink.md | 4 +-
docs/src/main/paradox/stream/operators/index.md | 4 +
.../pekko/stream/DslFactoriesConsistencySpec.scala | 1 +
.../stream/scaladsl/EagerFutureSinkSpec.scala | 193 +++++++++++++++++++++
.../scala/org/apache/pekko/stream/impl/Sinks.scala | 128 ++++++++++++++
.../org/apache/pekko/stream/impl/Stages.scala | 1 +
.../org/apache/pekko/stream/javadsl/Sink.scala | 20 +++
.../org/apache/pekko/stream/scaladsl/Sink.scala | 18 ++
13 files changed, 456 insertions(+), 2 deletions(-)
diff --git a/docs/src/main/paradox/stream/operators/Sink/completionStageSink.md
b/docs/src/main/paradox/stream/operators/Sink/completionStageSink.md
index 186c187e6d..16a1b2f5eb 100644
--- a/docs/src/main/paradox/stream/operators/Sink/completionStageSink.md
+++ b/docs/src/main/paradox/stream/operators/Sink/completionStageSink.md
@@ -10,6 +10,14 @@ Streams the elements to the given future sink once it
successfully completes.
Streams the elements through the given future flow once it successfully
completes.
If the future fails the stream is failed.
+`completionStageSink` uses the same lazy materialization semantics as
+@ref:[lazyCompletionStageSink](lazyCompletionStageSink.md): the nested sink is
not materialized until the first
+upstream element arrives. If the stream completes before the first element,
the materialized value fails with
+`org.apache.pekko.stream.NeverMaterializedException`.
+
+If you want this to work for empty streams as well, use
+@ref:[eagerCompletionStageSink](eagerCompletionStageSink.md).
+
## Reactive Streams semantics
@@@div { .callout }
diff --git
a/docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md
b/docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md
new file mode 100644
index 0000000000..cc31625894
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md
@@ -0,0 +1,33 @@
+# Sink.eagerCompletionStageSink
+
+Materializes the inner sink when the future completes, even if no elements
have arrived yet.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+
+## Description
+
+Turn a `CompletionStage<Sink>` into a Sink that will consume the values of the
source when the future completes
+successfully. If the `CompletionStage` is completed with a failure the stream
is failed.
+
+Unlike @ref:[completionStageSink](completionStageSink.md) and
@ref:[lazyCompletionStageSink](lazyCompletionStageSink.md), this operator
materializes the inner sink as soon as the future
+completes, even if no elements have arrived yet. This means empty streams
complete normally rather than failing
+with `NeverMaterializedException`. At most one element that arrives before the
future completes is buffered.
+
+The materialized future value is completed with the materialized value of the
inner sink once it has been
+materialized, or failed if the `CompletionStage` itself fails or if
materialization of the inner sink fails.
+Upstream failures or downstream cancellations that occur before the inner sink
is materialized are propagated
+through the inner sink rather than failing the materialized value directly.
+
+See also @ref:[completionStageSink](completionStageSink.md),
@ref:[lazyCompletionStageSink](lazyCompletionStageSink.md).
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** if the future fails or if the created sink cancels
+
+**backpressures** when initialized and when created sink backpressures
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md
b/docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md
new file mode 100644
index 0000000000..e481ff0044
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md
@@ -0,0 +1,37 @@
+# Sink.eagerFutureSink
+
+Materializes the inner sink when the future completes, even if no elements
have arrived yet.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.eagerFutureSink](Sink$) {
scala="#eagerFutureSink[T,M](future:scala.concurrent.Future[org.apache.pekko.stream.scaladsl.Sink[T,M]]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]"
}
+
+
+## Description
+
+Turn a `Future[Sink]` into a Sink that will consume the values of the source
when the future completes
+successfully. If the `Future` is completed with a failure the stream is failed.
+
+Unlike @ref:[futureSink](futureSink.md) and
@ref:[lazyFutureSink](lazyFutureSink.md), this operator materializes the inner
sink as soon as the future
+completes, even if no elements have arrived yet. This means empty streams
complete normally rather than failing
+with `NeverMaterializedException`. At most one element that arrives before the
future completes is buffered.
+
+The materialized future value is completed with the materialized value of the
inner sink once it has been
+materialized, or failed if the future itself fails or if materialization of
the inner sink fails. Upstream
+failures or downstream cancellations that occur before the inner sink is
materialized are propagated through
+the inner sink rather than failing the materialized value directly.
+
+See also @ref:[futureSink](futureSink.md),
@ref:[lazyFutureSink](lazyFutureSink.md).
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** if the future fails or if the created sink cancels
+
+**backpressures** when initialized and when created sink backpressures
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/Sink/futureSink.md
b/docs/src/main/paradox/stream/operators/Sink/futureSink.md
index 59b50afeb3..45e54fab76 100644
--- a/docs/src/main/paradox/stream/operators/Sink/futureSink.md
+++ b/docs/src/main/paradox/stream/operators/Sink/futureSink.md
@@ -14,6 +14,12 @@ Streams the elements to the given future sink once it
successfully completes.
Streams the elements through the given future flow once it successfully
completes.
If the future fails the stream is failed.
+`futureSink` uses the same lazy materialization semantics as
@ref:[lazyFutureSink](lazyFutureSink.md): the nested sink
+is not materialized until the first upstream element arrives. If the stream
completes before the first element, the
+materialized value fails with
`org.apache.pekko.stream.NeverMaterializedException`.
+
+If you want this to work for empty streams as well, use
@ref:[eagerFutureSink](eagerFutureSink.md).
+
## Reactive Streams semantics
@@@div { .callout }
diff --git
a/docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md
b/docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md
index 5931ab4d25..14b66218e1 100644
--- a/docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md
+++ b/docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md
@@ -16,7 +16,10 @@ and failed with a
`org.apache.pekko.stream.NeverMaterializedException` if the st
Can be combined with @ref:[prefixAndTail](../Source-or-Flow/prefixAndTail.md)
to base the sink on the first element.
-See also @ref:[lazySink](lazySink.md).
+If you need empty streams to complete normally, use
+@ref:[eagerCompletionStageSink](eagerCompletionStageSink.md).
+
+See also @ref:[lazySink](lazySink.md),
@ref:[completionStageSink](completionStageSink.md).
## Reactive Streams semantics
diff --git a/docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md
b/docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md
index b905c28672..0be83b8005 100644
--- a/docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md
+++ b/docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md
@@ -20,7 +20,9 @@ and failed with a
`org.apache.pekko.stream.NeverMaterializedException` if the st
Can be combined with @ref:[prefixAndTail](../Source-or-Flow/prefixAndTail.md)
to base the sink on the first element.
-See also @ref:[lazySink](lazySink.md).
+If you need empty streams to complete normally, use
@ref:[eagerFutureSink](eagerFutureSink.md).
+
+See also @ref:[lazySink](lazySink.md), @ref:[futureSink](futureSink.md).
## Reactive Streams semantics
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index 68bc137888..f39c251bb8 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -59,6 +59,8 @@ These built-in sinks are available from
@scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several
sinks into one using a user specified strategy|
|Sink|<a
name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams
the elements to the given future sink once it successfully completes. |
|Sink|<a name="count"></a>@ref[count](Sink/count.md)|Counts all incoming
elements until upstream terminates.|
+|Sink|<a
name="eagercompletionstagesink"></a>@ref[eagerCompletionStageSink](Sink/eagerCompletionStageSink.md)|Materializes
the inner sink when the future completes, even if no elements have arrived
yet.|
+|Sink|<a
name="eagerfuturesink"></a>@ref[eagerFutureSink](Sink/eagerFutureSink.md)|Materializes
the inner sink when the future completes, even if no elements have arrived
yet.|
|Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will
test the given predicate `p` for every received element and completes with the
result.|
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements
with a function, where each invocation will get the new element and the result
from the previous fold invocation.|
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over
emitted elements with a function, where each invocation will get the new
element and the result from the previous fold invocation.|
@@ -455,6 +457,8 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [dropRepeated](Source-or-Flow/dropRepeated.md)
* [dropWhile](Source-or-Flow/dropWhile.md)
* [dropWithin](Source-or-Flow/dropWithin.md)
+* [eagerCompletionStageSink](Sink/eagerCompletionStageSink.md)
+* [eagerFutureSink](Sink/eagerFutureSink.md)
* [empty](Source/empty.md)
* [exists](Sink/exists.md)
* [expand](Source-or-Flow/expand.md)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 3481be78ea..8f9fffa155 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -41,6 +41,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with
Matchers {
"lazyFutureFlow", // lazyCompletionStageFlow
"futureFlow", // completionStageFlow
"futureSink", // completionStageSink
+ "eagerFutureSink", // eagerCompletionStageSink
"lazyFutureSink", // lazyCompletionStageSink
"createGraph" // renamed/overload of create for getting type inference
working in Scala 3
)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/EagerFutureSinkSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/EagerFutureSinkSpec.scala
new file mode 100644
index 0000000000..10d0374a6c
--- /dev/null
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/EagerFutureSinkSpec.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import scala.concurrent.{ Future, Promise }
+
+import org.apache.pekko
+import pekko.stream.{ AbruptStageTerminationException, Materializer }
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.Utils._
+
+class EagerFutureSinkSpec extends StreamSpec("""
+ pekko.stream.materializer.initial-input-buffer-size = 1
+ pekko.stream.materializer.max-input-buffer-size = 1
+ """) {
+
+ val ex = TE("")
+
+ "Sink.eagerFutureSink" must {
+
+ "work with an already-completed future" in {
+ val result = Source(List(1, 2, 3))
+
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
+ .run()
+ .flatten
+
+ result.futureValue shouldBe Seq(1, 2, 3)
+ }
+
+ "work when the future completes after elements arrive" in {
+ val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+ val result = Source(List(1, 2, 3))
+ .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+ .run()
+ .flatten
+
+ sinkPromise.success(Sink.seq[Int])
+ result.futureValue shouldBe Seq(1, 2, 3)
+ }
+
+ "handle an empty stream with an already-completed future" in {
+ val result = Source
+ .empty[Int]
+
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
+ .run()
+ .flatten
+
+ result.futureValue shouldBe Seq.empty
+ }
+
+ "handle an empty stream with a pending future" in {
+ val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+ val result = Source
+ .empty[Int]
+ .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+ .run()
+ .flatten
+
+ sinkPromise.success(Sink.seq[Int])
+ result.futureValue shouldBe Seq.empty
+ }
+
+ "propagate failure when the future fails" in {
+ val result = Source(List(1, 2, 3))
+ .toMat(Sink.eagerFutureSink(Future.failed[Sink[Int,
Future[Seq[Int]]]](ex)))(Keep.right)
+ .run()
+ .flatten
+
+ result.failed.futureValue shouldBe ex
+ }
+
+ "propagate upstream failure" in {
+ val result = Source
+ .failed[Int](ex)
+
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
+ .run()
+ .flatten
+
+ result.failed.futureValue shouldBe ex
+ }
+
+ "propagate upstream failure when the future is still pending" in {
+ val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+ val result = Source
+ .failed[Int](ex)
+ .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+ .run()
+ .flatten
+
+ sinkPromise.success(Sink.seq[Int])
+ result.failed.futureValue shouldBe ex
+ }
+
+ "propagate upstream failure when element was buffered and future resolves
later" in {
+ val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+ val result = Source(List(1))
+ .concat(Source.failed[Int](ex))
+ .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+ .run()
+ .flatten
+
+ sinkPromise.success(Sink.seq[Int])
+ result.failed.futureValue shouldBe ex
+ }
+
+ "work with Sink.fold on a non-empty stream" in {
+ val result = Source(List(1, 2, 3))
+ .toMat(Sink.eagerFutureSink(Future.successful(Sink.fold[Int, Int](0)(_
+ _))))(Keep.right)
+ .run()
+ .flatten
+
+ result.futureValue shouldBe 6
+ }
+
+ "work with Sink.fold on an empty stream" in {
+ val result = Source
+ .empty[Int]
+ .toMat(Sink.eagerFutureSink(Future.successful(Sink.fold[Int, Int](0)(_
+ _))))(Keep.right)
+ .run()
+ .flatten
+
+ result.futureValue shouldBe 0
+ }
+
+ "not throw NeverMaterializedException on empty stream (unlike futureSink)"
in {
+ val result = Source
+ .empty[Int]
+
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
+ .run()
+ .flatten
+
+ result.futureValue shouldBe Seq.empty
+ }
+
+ "materialize inner sink immediately when the future is already completed
(even with no elements yet)" in {
+ val innerMatPromise = Promise[Unit]()
+ val sink = Sink.foreach[Int](_ => ()).mapMaterializedValue(_ =>
innerMatPromise.success(()))
+ val sinkFuture = Future.successful(sink)
+
+ Source.maybe[Int]
+ .toMat(Sink.eagerFutureSink(sinkFuture))(Keep.right)
+ .run()
+
+ innerMatPromise.future.futureValue shouldBe (())
+ }
+
+ "cancel upstream when inner sink cancels" in {
+ val result = Source(List(1, 2, 3, 4, 5))
+
.toMat(Sink.eagerFutureSink(Future.successful(Sink.head[Int])))(Keep.right)
+ .run()
+ .flatten
+
+ result.futureValue shouldBe 1
+ }
+
+ "propagate failure when the future fails late" in {
+ val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+ val result = Source(List(1, 2, 3))
+ .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+ .run()
+
+ sinkPromise.failure(ex)
+ result.failed.futureValue shouldBe ex
+ }
+
+ "fail the materialized value on abrupt termination before future
completion" in {
+ val mat = Materializer(system)
+ val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+ val result = Source.maybe[Int]
+ .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+ .run()(mat)
+
+ mat.shutdown()
+
+ result.failed.futureValue shouldBe an[AbruptStageTerminationException]
+ }
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
index dc2ac01da7..08eb2942f6 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
@@ -40,6 +40,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.StreamLayout.AtomicModule
import pekko.stream.scaladsl.{ Keep, Sink, SinkQueueWithCancel, Source }
import pekko.stream.stage._
+import pekko.util.OptionVal
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
@@ -661,3 +662,130 @@ import org.reactivestreams.Subscriber
(stageLogic, promise.future)
}
}
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future:
Future[Sink[T, M]])
+ extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+ val in = Inlet[T]("eagerFutureSink.in")
+ override def initialAttributes = DefaultAttributes.eagerFutureSink
+ override val shape: SinkShape[T] = SinkShape.of(in)
+
+ override def toString: String = "EagerFutureSink"
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[M]) = {
+ val promise = Promise[M]()
+ val stageLogic = new GraphStageLogic(shape) with InHandler {
+ private var bufferedElement: OptionVal[T] = OptionVal.none
+ private var upstreamFailed: OptionVal[Throwable] = OptionVal.none
+ private var upstreamClosed = false
+
+ override def preStart(): Unit = {
+ pull(in)
+ val cb = getAsyncCallback[Try[Sink[T, M]]] {
+ case Success(sink) => onSinkReady(sink)
+ case Failure(e) =>
+ promise.tryFailure(e)
+ failStage(e)
+ }
+ try {
+ future.onComplete(cb.invoke)(ExecutionContext.parasitic)
+ } catch {
+ case NonFatal(e) =>
+ promise.tryFailure(e)
+ failStage(e)
+ }
+ setKeepGoing(true)
+ }
+
+ override def onPush(): Unit = {
+ bufferedElement = OptionVal.Some(grab(in))
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ upstreamClosed = true
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ upstreamFailed = OptionVal.Some(ex)
+ upstreamClosed = true
+ }
+
+ private def onSinkReady(sink: Sink[T, M]): Unit = {
+ if (promise.isCompleted) return
+ try {
+ val subOutlet = new SubSourceOutlet[T]("EagerFutureSink")
+ val matVal = interpreter.subFusingMaterializer
+
.materialize(Source.fromGraph(subOutlet.source).toMat(sink)(Keep.right),
inheritedAttributes)
+ promise.trySuccess(matVal)
+
+ setHandler(
+ in,
+ new InHandler {
+ override def onPush(): Unit = subOutlet.push(grab(in))
+ override def onUpstreamFinish(): Unit = {
+ subOutlet.complete()
+ completeStage()
+ }
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ subOutlet.fail(ex)
+ failStage(ex)
+ }
+ })
+
+ subOutlet.setHandler(new OutHandler {
+ override def onPull(): Unit = {
+ bufferedElement match {
+ case OptionVal.Some(elem) =>
+ bufferedElement = OptionVal.none
+ subOutlet.push(elem)
+ if (upstreamClosed) {
+ subOutlet.complete()
+ completeStage()
+ }
+ case _ =>
+ if (upstreamClosed) {
+ subOutlet.complete()
+ completeStage()
+ } else if (!isClosed(in)) {
+ pull(in)
+ }
+ }
+ }
+ override def onDownstreamFinish(cause: Throwable): Unit = {
+ if (!isClosed(in)) cancel(in, cause)
+ completeStage()
+ }
+ })
+
+ upstreamFailed match {
+ case OptionVal.Some(ex) =>
+ subOutlet.fail(ex)
+ failStage(ex)
+ case _ =>
+ if (upstreamClosed && bufferedElement.isEmpty) {
+ subOutlet.complete()
+ setKeepGoing(false)
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ promise.tryFailure(e)
+ failStage(e)
+ }
+ }
+
+ override def postStop(): Unit = {
+ if (!promise.isCompleted) promise.failure(new
AbruptStageTerminationException(this))
+ }
+
+ setHandler(in, this)
+ }
+ (stageLogic, promise.future)
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 2d4bd30c98..6de53600bd 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -167,6 +167,7 @@ import pekko.stream.Attributes._
val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink")
val queueSink = name("queueSink")
val lazySink = name("lazySink")
+ val eagerFutureSink = name("eagerFutureSink")
val lazyFlow = name("lazyFlow")
val futureFlow = name("futureFlow")
val lazySource = name("lazySource")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index bcf53dbbf0..2b23421f8c 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -525,6 +525,26 @@ object Sink {
def completionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T,
CompletionStage[M]] =
lazyCompletionStageSink[T, M](() => future)
+ /**
+ * Turn a `CompletionStage[Sink]` into a Sink that will consume the values
of the source when the future completes
+ * successfully. If the `CompletionStage` is completed with a failure the
stream is failed.
+ *
+ * Unlike [[completionStageSink]] and [[lazyCompletionStageSink]], this
operator materializes the inner sink as
+ * soon as the future completes, even if no elements have arrived yet. This
means empty streams complete normally
+ * rather than failing with [[NeverMaterializedException]]. At most one
element that arrives before the future
+ * completes is buffered.
+ *
+ * The materialized future value is completed with the materialized value of
the inner sink once it has been
+ * materialized, or failed if the `CompletionStage` itself fails or if
materialization of the inner sink fails.
+ * Upstream failures or downstream cancellations that occur before the inner
sink is materialized are propagated
+ * through the inner sink rather than failing the materialized value
directly.
+ *
+ * @since 1.5.0
+ */
+ def eagerCompletionStageSink[T, M](future: CompletionStage[Sink[T, M]]):
Sink[T, CompletionStage[M]] =
+ new
Sink(scaladsl.Sink.eagerFutureSink(future.asScala.map(_.asScala)(ExecutionContext.parasitic)))
+ .mapMaterializedValue(_.asJava)
+
/**
* Defers invoking the `create` function to create a sink until there is a
first element passed from upstream.
*
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 03f97f94f8..a81c3ba97b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -733,6 +733,24 @@ object Sink {
def futureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] =
lazyFutureSink[T, M](() => future)
+ /**
+ * Turn a `Future[Sink]` into a Sink that will consume the values of the
source when the future completes
+ * successfully. If the `Future` is completed with a failure the stream is
failed.
+ *
+ * Unlike [[futureSink]] and [[lazyFutureSink]], this operator materializes
the inner sink as soon as the future
+ * completes, even if no elements have arrived yet. This means empty streams
complete normally rather than failing
+ * with [[NeverMaterializedException]]. At most one element that arrives
before the future completes is buffered.
+ *
+ * The materialized future value is completed with the materialized value of
the inner sink once it has been
+ * materialized, or failed if the future itself fails or if materialization
of the inner sink fails. Upstream
+ * failures or downstream cancellations that occur before the inner sink is
materialized are propagated through
+ * the inner sink rather than failing the materialized value directly.
+ *
+ * @since 1.5.0
+ */
+ def eagerFutureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] =
+ Sink.fromGraph(new EagerFutureSink(future))
+
/**
* Defers invoking the `create` function to create a sink until there is a
first element passed from upstream.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]