This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 1e25aa89a8 perf(stream): reduce source construction overhead (#2914)
1e25aa89a8 is described below
commit 1e25aa89a81fdfec5caaff6ba038a4b82e8dfb1c
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 10 11:59:36 2026 +0800
perf(stream): reduce source construction overhead (#2914)
* perf(stream): reduce source construction overhead
* fix(stream): keep future source operators indexed
* fix(stream): address source fast path review feedback
Motivation:
PR #2914 review found two correctness gaps in the new source stages and one
optimization opportunity in value-presented source detection.
Modification:
Add a value-presented source marker trait, enforce null-element compliance
for repeat sources, and handle iterator-factory supervision failures without
null iterator access.
Result:
Value-presented source detection is simpler, Source.repeat rejects null at
construction time, and iterator-factory resume/restart paths are deterministic.
Tests:
- scalafmt --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 / passed
- scalafmt --list --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279
/ passed
- git diff --check / passed
- sbt "stream / Test / compile" "stream-tests / Test / testOnly
org.apache.pekko.stream.scaladsl.SourceSpec
org.apache.pekko.stream.impl.TraversalBuilderSpec" / passed
- sbt "stream / mimaReportBinaryIssues" / passed
References:
Refs #2914
Co-authored-by: Copilot <[email protected]>
* perf(stream): tune source fast paths for JIT
Motivation:
Source fast paths should avoid unnecessary per-materialization state and
immediate-path virtual calls in hot stream operators.
Modification:
Hoist immutable range-derived values from RangeSource logic instances to
the stage, and push repeat source elements directly in FlattenConcat's
immediate fast path.
Result:
Range materialization keeps per-run state smaller and repeat fast-path
emission avoids an unnecessary method dispatch without changing stream behavior.
Tests:
- scalafmt --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 / passed
- scalafmt --list --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279
/ passed
- git diff --check / passed
- sbt "stream / Test / compile" "stream-tests / Test / testOnly
org.apache.pekko.stream.scaladsl.SourceSpec
org.apache.pekko.stream.impl.TraversalBuilderSpec" / passed
- sbt "stream / mimaReportBinaryIssues" / passed
- JDK 21.0.5 sbt "bench-jmh/Jmh/run -wi 1 -i 1 -f1 -t1 -prof gc
.*GraphStageConstructionBenchmark.*" / passed
- JDK 21.0.5 sbt "bench-jmh/Jmh/run -wi 1 -i 1 -f1 -t1
.*RangeSourceBenchmark.*" / passed
References:
Refs #2914
Co-authored-by: Copilot <[email protected]>
* perf(stream): trim traversal builder construction overhead
Motivation:
Graph-stage source, sink, and flow construction all pass through
LinearTraversalBuilder.fromModule, so small temporary allocations in that path
affect constructor-heavy workloads.
Modification:
Cache shape ports in local values and avoid headOption allocation when
creating OptionVal wrappers for single-port linear modules.
Result:
Constructor fast paths do less repeated port lookup work while preserving
the same traversal wiring semantics.
Tests:
- scalafmt --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 / passed
- scalafmt --list --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279
/ passed
- git diff --check / passed
- sbt "stream / Test / compile" "stream-tests / Test / testOnly
org.apache.pekko.stream.scaladsl.SourceSpec
org.apache.pekko.stream.impl.TraversalBuilderSpec" / passed
- sbt "stream / mimaReportBinaryIssues" / passed
- JDK 21.0.5 sbt "bench-jmh/Jmh/run -wi 1 -i 1 -f1 -t1 -prof gc
.*GraphStageConstructionBenchmark.*" / passed
- JDK 21.0.5 sbt "bench-jmh/Jmh/run -wi 1 -i 1 -f1 -t1
.*RangeSourceBenchmark.*" / passed
References:
Refs #2914
Co-authored-by: Copilot <[email protected]>
---------
Co-authored-by: Copilot <[email protected]>
---
.../stream/GraphStageConstructionBenchmark.scala | 180 +++++++++++++++++++++
.../apache/pekko/stream/RangeSourceBenchmark.scala | 103 ++++++++++++
.../apache/pekko/stream/javadsl/SourceTest.java | 15 ++
.../pekko/stream/impl/TraversalBuilderSpec.scala | 39 ++++-
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 103 ++++++++++++
.../apache/pekko/stream/impl/ActorRefSource.scala | 2 +
.../apache/pekko/stream/impl/FailedSource.scala | 5 +-
.../pekko/stream/impl/JavaStreamSource.scala | 4 +-
.../org/apache/pekko/stream/impl/QueueSource.scala | 2 +
.../pekko/stream/impl/TraversalBuilder.scala | 97 +++++++----
.../pekko/stream/impl/fusing/FlattenConcat.scala | 55 ++++++-
.../pekko/stream/impl/fusing/GraphStages.scala | 24 ++-
.../pekko/stream/impl/fusing/IterableSource.scala | 5 +-
.../{IterableSource.scala => IteratorSource.scala} | 58 ++++---
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 8 +-
.../pekko/stream/impl/fusing/RangeSource.scala | 66 ++++++++
.../org/apache/pekko/stream/javadsl/Source.scala | 6 +-
.../org/apache/pekko/stream/scaladsl/Flow.scala | 8 +-
.../org/apache/pekko/stream/scaladsl/Sink.scala | 8 +-
.../org/apache/pekko/stream/scaladsl/Source.scala | 147 +++++++++--------
20 files changed, 789 insertions(+), 146 deletions(-)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
new file mode 100644
index 0000000000..fb2aa9a963
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.immutable
+import scala.concurrent.Promise
+
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.impl.LinearTraversalBuilder
+import org.apache.pekko.stream.impl.Stages.DefaultAttributes
+import org.apache.pekko.stream.impl.fusing.GraphStages
+import org.apache.pekko.stream.impl.fusing.IterableSource
+import org.apache.pekko.stream.scaladsl.Flow
+import org.apache.pekko.stream.scaladsl.Keep
+import org.apache.pekko.stream.scaladsl.Sink
+import org.apache.pekko.stream.scaladsl.Source
+import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue
+import org.openjdk.jmh.annotations.Benchmark
+import org.openjdk.jmh.annotations.BenchmarkMode
+import org.openjdk.jmh.annotations.Mode
+import org.openjdk.jmh.annotations.OutputTimeUnit
+import org.openjdk.jmh.annotations.Scope
+import org.openjdk.jmh.annotations.State
+import org.openjdk.jmh.infra.Blackhole
+
+@State(Scope.Benchmark)
+@BenchmarkMode(Array(Mode.Throughput))
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+class GraphStageConstructionBenchmark {
+ private val element = "element"
+ private val elements: immutable.Iterable[String] = Vector(element, element)
+ private val range = 1 to 1000
+ private val pendingFuture = Promise[String]().future
+ private val flowStage = GraphStages.identity[Any]
+ private val sinkStage = GraphStages.IgnoreSink
+
+ private def oldSourceFromGraphStage[T, M](
+ stage: GraphStageWithMaterializedValue[SourceShape[T], M]): Source[T, M]
= {
+ val attributes = stage.traversalBuilder.attributes
+ val noAttributeStage = stage.withAttributes(Attributes.none)
+ new Source(
+ LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder,
noAttributeStage.shape, Keep.right),
+ noAttributeStage.shape).withAttributes(attributes)
+ }
+
+ private def oldSourceFromIterable[T](iterable: immutable.Iterable[T]):
Source[T, NotUsed] =
+ (iterable.knownSize: @scala.annotation.switch) match {
+ case 0 => Source.empty
+ case 1 => oldSourceFromGraphStage(new
GraphStages.SingleSource(iterable.head))
+ case _ =>
+ oldSourceFromGraphStage(new
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+ }
+
+ @Benchmark
+ def sourceSingle(blackhole: Blackhole): Unit =
+ blackhole.consume(Source.single(element))
+
+ @Benchmark
+ def javadslSourceSingle(blackhole: Blackhole): Unit =
+ blackhole.consume(javadsl.Source.single(element))
+
+ @Benchmark
+ def sourceRepeat(blackhole: Blackhole): Unit =
+ blackhole.consume(Source.repeat(element))
+
+ @Benchmark
+ def oldSourceRepeatPath(blackhole: Blackhole): Unit = {
+ val iterable = new immutable.Iterable[String] {
+ override def iterator: Iterator[String] = Iterator.continually(element)
+ override def toString: String = "() => Iterator"
+ }
+ blackhole.consume(
+ oldSourceFromGraphStage(new
IterableSource[String](iterable)).withAttributes(DefaultAttributes.repeat))
+ }
+
+ @Benchmark
+ def sourceFromIterator(blackhole: Blackhole): Unit =
+ blackhole.consume(Source.fromIterator(() => Iterator.single(element)))
+
+ @Benchmark
+ def oldSourceFromIteratorPath(blackhole: Blackhole): Unit = {
+ val iterable = new immutable.Iterable[String] {
+ override def iterator: Iterator[String] = Iterator.single(element)
+ override def toString: String = "() => Iterator"
+ }
+ blackhole.consume(
+ oldSourceFromGraphStage(new
IterableSource[String](iterable)).withAttributes(DefaultAttributes.iterableSource))
+ }
+
+ @Benchmark
+ def sourceIterable(blackhole: Blackhole): Unit =
+ blackhole.consume(Source(elements))
+
+ @Benchmark
+ def oldSourceIterablePath(blackhole: Blackhole): Unit =
+ blackhole.consume(oldSourceFromIterable(elements))
+
+ @Benchmark
+ def sourceRange(blackhole: Blackhole): Unit =
+ blackhole.consume(Source(range))
+
+ @Benchmark
+ def oldSourceRangePath(blackhole: Blackhole): Unit =
+ blackhole.consume(
+ oldSourceFromGraphStage(new
IterableSource[Int](range)).withAttributes(DefaultAttributes.iterableSource))
+
+ @Benchmark
+ def javadslSourceRange(blackhole: Blackhole): Unit =
+ blackhole.consume(javadsl.Source.range(1, 1000))
+
+ @Benchmark
+ def oldJavadslSourceRangePath(blackhole: Blackhole): Unit =
+ blackhole.consume(
+ new javadsl.Source(
+ oldSourceFromGraphStage(new
IterableSource[Integer](range.asInstanceOf[immutable.Iterable[Integer]]))
+ .withAttributes(DefaultAttributes.iterableSource)))
+
+ @Benchmark
+ def sourceFuturePending(blackhole: Blackhole): Unit =
+ blackhole.consume(Source.future(pendingFuture))
+
+ @Benchmark
+ def oldSourceFuturePendingPath(blackhole: Blackhole): Unit =
+ blackhole.consume(oldSourceFromGraphStage(new
GraphStages.FutureSource[String](pendingFuture)))
+
+ @Benchmark
+ def sourceFromGraphStage(blackhole: Blackhole): Unit =
+ blackhole.consume(Source.fromGraph(new GraphStages.SingleSource(element)))
+
+ @Benchmark
+ def oldSourceFromGraphStagePath(blackhole: Blackhole): Unit = {
+ val stage = new GraphStages.SingleSource(element)
+ blackhole.consume(oldSourceFromGraphStage(stage))
+ }
+
+ @Benchmark
+ def sinkFromGraphStage(blackhole: Blackhole): Unit =
+ blackhole.consume(Sink.fromGraph(sinkStage))
+
+ @Benchmark
+ def oldSinkFromGraphStagePath(blackhole: Blackhole): Unit = {
+ val attributes = sinkStage.traversalBuilder.attributes
+ val noAttributeStage = sinkStage.withAttributes(Attributes.none)
+ blackhole.consume(
+ new Sink(
+ LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder,
noAttributeStage.shape, Keep.right),
+ noAttributeStage.shape).withAttributes(attributes))
+ }
+
+ @Benchmark
+ def flowFromGraphStage(blackhole: Blackhole): Unit =
+ blackhole.consume(Flow.fromGraph(flowStage))
+
+ @Benchmark
+ def oldFlowFromGraphStagePath(blackhole: Blackhole): Unit = {
+ val attributes = flowStage.traversalBuilder.attributes
+ val noAttributeStage = flowStage.withAttributes(Attributes.none)
+ blackhole.consume(
+ new Flow(
+ LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder,
noAttributeStage.shape, Keep.right),
+ noAttributeStage.shape).withAttributes(attributes))
+ }
+}
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala
new file mode 100644
index 0000000000..59925f9612
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.openjdk.jmh.annotations._
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.IterableSource
+import pekko.stream.scaladsl.{ Keep, RunnableGraph, Sink, Source }
+import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue,
InHandler }
+
+object RangeSourceBenchmark {
+ @volatile var sinkSum: Int = 0
+}
+
+final class IntCompletionLatch extends
GraphStageWithMaterializedValue[SinkShape[Int], CountDownLatch] {
+ val in: Inlet[Int] = Inlet[Int]("IntCompletionLatch.in")
+ override val shape: SinkShape[Int] = SinkShape(in)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, CountDownLatch) = {
+ val latch = new CountDownLatch(1)
+ val logic = new GraphStageLogic(shape) with InHandler {
+ private[this] var sum = 0
+
+ override def preStart(): Unit = pull(in)
+ override def onPush(): Unit = {
+ sum += grab(in)
+ pull(in)
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ RangeSourceBenchmark.sinkSum = sum
+ latch.countDown()
+ completeStage()
+ }
+
+ setHandler(in, this)
+ }
+ (logic, latch)
+ }
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class RangeSourceBenchmark {
+ implicit val system: ActorSystem = ActorSystem("RangeSourceBenchmark")
+
+ @Param(Array("1", "1000"))
+ var elements: Int = 0
+
+ var rangeToLatch: RunnableGraph[CountDownLatch] = _
+ var oldRangeToLatch: RunnableGraph[CountDownLatch] = _
+
+ @Setup
+ def setup(): Unit = {
+ val range = 1 to elements
+ rangeToLatch = Source(range).toMat(Sink.fromGraph(new
IntCompletionLatch))(Keep.right)
+ oldRangeToLatch = Source
+ .fromGraph(new IterableSource[Int](range))
+ .withAttributes(DefaultAttributes.iterableSource)
+ .toMat(Sink.fromGraph(new IntCompletionLatch))(Keep.right)
+ }
+
+ @TearDown
+ def shutdown(): Unit = {
+ Await.result(system.terminate(), 5.seconds)
+ }
+
+ @Benchmark
+ def sourceRangeToLatch(): Unit =
+ await(rangeToLatch.run())
+
+ @Benchmark
+ def oldSourceRangeToLatch(): Unit =
+ await(oldRangeToLatch.run())
+
+ private def await(latch: CountDownLatch): Unit =
+ if (!latch.await(5, TimeUnit.SECONDS))
+ throw new RuntimeException("Latch timed out")
+}
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 3aec7545b0..db3341baea 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -747,6 +747,21 @@ public class SourceTest extends StreamTestJupiter {
}
}
+ @Test
+ public void mustWorkFromRangeWithNegativeStep() throws Exception {
+ CompletionStage<List<Integer>> f =
+ Source.range(5, 1, -2).grouped(20).runWith(Sink.head(), system);
+ final List<Integer> result = f.toCompletableFuture().get(3,
TimeUnit.SECONDS);
+ assertEquals(Arrays.asList(5, 3, 1), result);
+ }
+
+ @Test
+ public void mustWorkFromEmptyRange() throws Exception {
+ CompletionStage<List<Integer>> f = Source.range(1, 5,
-1).runWith(Sink.seq(), system);
+ final List<Integer> result = f.toCompletableFuture().get(3,
TimeUnit.SECONDS);
+ assertEquals(0, result.size());
+ }
+
@Test
public void mustRepeat() throws Exception {
final CompletionStage<List<Integer>> f =
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
index 2af4b9b0a7..153339aeef 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
@@ -19,8 +19,9 @@ import org.apache.pekko
import pekko.NotUsed
import pekko.stream._
import pekko.stream.impl.TraversalTestUtils._
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
-import pekko.stream.impl.fusing.IterableSource
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource,
SingleSource }
+import pekko.stream.impl.fusing.{ IterableSource, IteratorSource, RangeSource }
import pekko.stream.scaladsl.{ Keep, Source }
import pekko.testkit.PekkoSpec
import pekko.util.OptionVal
@@ -547,6 +548,40 @@ class TraversalBuilderSpec extends PekkoSpec {
OptionVal.None)
}
+ "find Source.fromIterator via TraversalBuilder with getValuePresentedSource"
in {
+ val createIterator = () => Iterator("a", "b", "c")
+
TraversalBuilder.getValuePresentedSource(Source.fromIterator(createIterator)).get.asInstanceOf[IteratorSource[
+ String]].createIterator should ===(
+ createIterator)
+ val iteratorSource = new IteratorSource(createIterator,
DefaultAttributes.iterableSource)
+ TraversalBuilder.getValuePresentedSource(iteratorSource) should
be(OptionVal.Some(iteratorSource))
+
+
TraversalBuilder.getValuePresentedSource(Source.fromIterator(createIterator).async)
should be(OptionVal.None)
+ TraversalBuilder.getValuePresentedSource(
+ Source.fromIterator(createIterator).mapMaterializedValue(_ => "Mat"))
should be(OptionVal.None)
+ }
+
+ "find Source.range via TraversalBuilder with getValuePresentedSource" in {
+ val range = 1 to 4
+
TraversalBuilder.getValuePresentedSource(Source(range)).get.asInstanceOf[RangeSource[Int]].range
should ===(range)
+ val rangeSource = new RangeSource[Int](range,
DefaultAttributes.iterableSource)
+ TraversalBuilder.getValuePresentedSource(rangeSource) should
be(OptionVal.Some(rangeSource))
+
+ TraversalBuilder.getValuePresentedSource(Source(range).async) should
be(OptionVal.None)
+
TraversalBuilder.getValuePresentedSource(Source(range).mapMaterializedValue(_
=> "Mat")) should be(OptionVal.None)
+ }
+
+ "find Source.repeat via TraversalBuilder with getValuePresentedSource" in {
+
TraversalBuilder.getValuePresentedSource(Source.repeat("a")).get.asInstanceOf[RepeatSource[String]].elem
should ===(
+ "a")
+ val repeatSource = new RepeatSource("a")
+ TraversalBuilder.getValuePresentedSource(repeatSource) should
be(OptionVal.Some(repeatSource))
+
+ TraversalBuilder.getValuePresentedSource(Source.repeat("c").async) should
be(OptionVal.None)
+
TraversalBuilder.getValuePresentedSource(Source.repeat("d").mapMaterializedValue(_
=> "Mat")) should be(
+ OptionVal.None)
+ }
+
"find Source.javaStreamSource via TraversalBuilder with
getValuePresentedSource" in {
val javaStream = java.util.stream.Stream.empty[String]()
TraversalBuilder.getValuePresentedSource(Source.fromJavaStream(() =>
javaStream)).get
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 22d3b55077..34b0633b66 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -358,6 +358,11 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
f.futureValue.toSet should ===(Set(42))
}
+ "reject null element at construction time" in {
+ val error = the[NullPointerException] thrownBy Source.repeat(null)
+ error.getMessage should ===("Element must not be null, rule 2.13")
+ }
+
"repeat example" in {
// #repeat
val source: Source[Int, NotUsed] = Source.repeat(42)
@@ -369,6 +374,52 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
// #repeat
f.futureValue shouldBe Done
}
+
+ "work when flattened through value-presented source fast path" in {
+ Source
+ .single("repeat")
+ .flatMapConcat(_ => Source.repeat(42))
+ .take(3)
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(42, 42, 42))
+ }
+
+ "work when recovered through value-presented source fast path" in {
+ Source
+ .failed[Int](TE("boom"))
+ .recoverWithRetries(1, { case _ => Source.repeat(42) })
+ .take(3)
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(42, 42, 42))
+ }
+ }
+
+ "Range Source" must {
+ "emit inclusive and exclusive ranges" in {
+ Source(1 to 4).runWith(Sink.seq).futureValue should ===(immutable.Seq(1,
2, 3, 4))
+ Source(1 until 4).runWith(Sink.seq).futureValue should
===(immutable.Seq(1, 2, 3))
+ }
+
+ "emit stepped and empty ranges" in {
+ Source(5 to 1 by -2).runWith(Sink.seq).futureValue should
===(immutable.Seq(5, 3, 1))
+ Source(1 to 5 by -1).runWith(Sink.seq).futureValue should
===(immutable.Seq.empty)
+ }
+
+ "work when flattened through value-presented source fast path" in {
+ Source
+ .single("range")
+ .flatMapConcat(_ => Source(1 to 3))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
+
+ "work when recovered through value-presented source fast path" in {
+ Source
+ .failed[Int](TE("boom"))
+ .recoverWithRetries(1, { case _ => Source(1 to 3) })
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
}
"Unfold Source" must {
@@ -437,6 +488,22 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
immutable.Seq(false, true, false, true, false, true, false, true,
false, true))
}
+ "work when flattened through value-presented source fast path" in {
+ Source
+ .single("iterator")
+ .flatMapConcat(_ => Source.fromIterator(() => Iterator(1, 2, 3)))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
+
+ "work when recovered through value-presented source fast path" in {
+ Source
+ .failed[Int](TE("boom"))
+ .recoverWithRetries(1, { case _ => Source.fromIterator(() =>
Iterator(1, 2, 3)) })
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
+
"fail stream when iterator throws" in {
Source
.fromIterator(() => (1 to 1000).toIterator.map(k => if (k < 10) k else
throw TE("a")))
@@ -516,6 +583,42 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
.failed
.futureValue shouldBe a[TE]
}
+
+ "complete when iterator factory throws and decider resumes" in {
+ Source
+ .fromIterator[Int](() => throw TE("factory"))
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq.empty)
+ }
+
+ "restart iterator factory when iterator factory throws and decider
restarts" in {
+ var attempts = 0
+ Source
+ .fromIterator { () =>
+ attempts += 1
+ if (attempts == 1) throw TE("factory")
+ else Iterator(1, 2, 3)
+ }
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ attempts should ===(2)
+ }
+
+ "fail when restarted iterator factory throws again" in {
+ var attempts = 0
+ Source
+ .fromIterator[Int] { () =>
+ attempts += 1
+ throw TE(s"factory-$attempts")
+ }
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+ .runWith(Sink.seq)
+ .failed
+ .futureValue shouldBe a[TE]
+ attempts should ===(2)
+ }
}
"ZipN Source" must {
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
index 18ebc52537..1abea0b5ce 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
@@ -18,6 +18,7 @@ import pekko.actor.ActorRef
import pekko.annotation.InternalApi
import pekko.stream._
import pekko.stream.OverflowStrategies._
+import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage._
import pekko.util.OptionVal
@@ -39,6 +40,7 @@ private object ActorRefSource {
val out: Outlet[T] = Outlet[T]("actorRefSource.out")
override val shape: SourceShape[T] = SourceShape.of(out)
+ override def initialAttributes: Attributes = DefaultAttributes.actorRefSource
def createLogicAndMaterializedValue(inheritedAttributes: Attributes):
(GraphStageLogic, ActorRef) =
throw new IllegalStateException("Not supported")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
index b107857f86..78d68f690d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
@@ -17,12 +17,15 @@ import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.{ Attributes, Outlet, SourceShape }
import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
/**
* INTERNAL API
*/
-@InternalApi private[pekko] final class FailedSource[T](val failure:
Throwable) extends GraphStage[SourceShape[T]] {
+@InternalApi private[pekko] final class FailedSource[T](val failure: Throwable)
+ extends GraphStage[SourceShape[T]]
+ with ValuePresentedSource {
val out = Outlet[T]("FailedSource.out")
override val shape = SourceShape(out)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
index 440a17e406..1fea035977 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
@@ -19,12 +19,14 @@ import java.util.function.Consumer
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream._
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
/** INTERNAL API */
@InternalApi private[stream] final class JavaStreamSource[T, S <:
java.util.stream.BaseStream[T, S]](
val open: () => java.util.stream.BaseStream[T, S])
- extends GraphStage[SourceShape[T]] {
+ extends GraphStage[SourceShape[T]]
+ with ValuePresentedSource {
val out: Outlet[T] = Outlet("JavaStreamSource")
override val shape: SourceShape[T] = SourceShape(out)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
index 61e217202a..36d8014ac3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
@@ -20,6 +20,7 @@ import pekko.Done
import pekko.annotation.InternalApi
import pekko.stream._
import pekko.stream.OverflowStrategies._
+import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.scaladsl.SourceQueueWithComplete
import pekko.stream.stage._
@@ -49,6 +50,7 @@ import pekko.stream.stage._
val out = Outlet[T]("queueSource.out")
override val shape: SourceShape[T] = SourceShape.of(out)
+ override def initialAttributes: Attributes = DefaultAttributes.queueSource
@scala.annotation.nowarn("msg=inferred structural type")
override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes) = {
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
index c6206a106a..4ed9cbe643 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
@@ -22,9 +22,10 @@ import pekko.annotation.{ DoNotInherit, InternalApi }
import pekko.stream._
import pekko.stream.impl.StreamLayout.AtomicModule
import pekko.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
-import pekko.stream.impl.fusing.{ GraphStageModule, IterableSource }
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
+import pekko.stream.impl.fusing.GraphStageModule
+import pekko.stream.impl.fusing.GraphStages.{ SingleSource,
ValuePresentedSource }
import pekko.stream.scaladsl.Keep
+import pekko.stream.stage.GraphStageWithMaterializedValue
import pekko.util.OptionVal
/**
@@ -354,24 +355,29 @@ import pekko.util.OptionVal
* performance optimization in FlattenMerge and possibly other places.
*/
def getSingleSource[A >: Null](graph: Graph[SourceShape[A], _]):
OptionVal[SingleSource[A]] = {
+ @inline def fromModule(module: AtomicModule[_, _]):
OptionVal[SingleSource[A]] =
+ module match {
+ case m: GraphStageModule[_, _] =>
+ m.stage match {
+ case single: SingleSource[A] @unchecked => OptionVal.Some(single)
+ case _ => OptionVal.None
+ }
+ case _ => OptionVal.None
+ }
+
graph match {
case single: SingleSource[A] @unchecked => OptionVal.Some(single)
case _ =>
graph.traversalBuilder match {
- case l: LinearTraversalBuilder =>
+ case l: LinearTraversalBuilder if !l.attributes.isAsync =>
l.pendingBuilder match {
case OptionVal.Some(a: AtomicTraversalBuilder) =>
- a.module match {
- case m: GraphStageModule[_, _] =>
- m.stage match {
- case single: SingleSource[A] @unchecked =>
- // It would be != EmptyTraversal if
mapMaterializedValue was used and then we can't optimize.
- if ((l.traversalSoFar eq EmptyTraversal) &&
!l.attributes.isAsync)
- OptionVal.Some(single)
- else OptionVal.None
- case _ => OptionVal.None
- }
- case _ => OptionVal.None
+ // It would be != EmptyTraversal if mapMaterializedValue was
used and then we can't optimize.
+ if (l.traversalSoFar eq EmptyTraversal) fromModule(a.module)
else OptionVal.None
+ case OptionVal.None =>
+ l.traversalSoFar match {
+ case MaterializeAtomic(module, _) => fromModule(module)
+ case _ => OptionVal.None
}
case _ => OptionVal.None
}
@@ -388,30 +394,34 @@ import pekko.util.OptionVal
@InternalApi def getValuePresentedSource[A >: Null](
graph: Graph[SourceShape[A], _]): OptionVal[Graph[SourceShape[A], _]] = {
def isValuePresentedSource(graph: Graph[SourceShape[_ <: A], _]): Boolean
= graph match {
- case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _:
JavaStreamSource[_, _] |
- _: FailedSource[_] =>
- true
+ case _: ValuePresentedSource => true
case maybeEmpty if isEmptySource(maybeEmpty) => true
case _ => false
}
+ @inline def fromModule(module: AtomicModule[_, _]):
OptionVal[Graph[SourceShape[A], _]] =
+ module match {
+ case m: GraphStageModule[_, _] =>
+ m.stage match {
+ case _ if
isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) =>
+ OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]])
+ case _ => OptionVal.None
+ }
+ case _ => OptionVal.None
+ }
+
graph match {
case _ if isValuePresentedSource(graph) => OptionVal.Some(graph)
case _ =>
graph.traversalBuilder match {
- case l: LinearTraversalBuilder =>
+ case l: LinearTraversalBuilder if !l.attributes.isAsync =>
l.pendingBuilder match {
case OptionVal.Some(a: AtomicTraversalBuilder) =>
- a.module match {
- case m: GraphStageModule[_, _] =>
- m.stage match {
- case _ if
isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) =>
- // It would be != EmptyTraversal if
mapMaterializedValue was used and then we can't optimize.
- if ((l.traversalSoFar eq EmptyTraversal) &&
!l.attributes.isAsync)
-
OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]])
- else OptionVal.None
- case _ => OptionVal.None
- }
- case _ => OptionVal.None
+ // It would be != EmptyTraversal if mapMaterializedValue was
used and then we can't optimize.
+ if (l.traversalSoFar eq EmptyTraversal) fromModule(a.module)
else OptionVal.None
+ case OptionVal.None =>
+ l.traversalSoFar match {
+ case MaterializeAtomic(module, _) => fromModule(module)
+ case _ => OptionVal.None
}
case _ => OptionVal.None
}
@@ -702,14 +712,17 @@ import pekko.util.OptionVal
* than its generic counterpart. It can be freely mixed with the generic
builder in both ways.
*/
def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes):
LinearTraversalBuilder = {
- if (module.shape.inlets.size > 1)
+ val shape = module.shape
+ val inlets = shape.inlets
+ val outlets = shape.outlets
+ if (inlets.size > 1)
throw new IllegalStateException("Modules with more than one input port
cannot be linear.")
- if (module.shape.outlets.size > 1)
+ if (outlets.size > 1)
throw new IllegalStateException("Modules with more than one output port
cannot be linear.")
- TraversalBuilder.initShape(module.shape)
+ TraversalBuilder.initShape(shape)
- val inPortOpt = OptionVal(module.shape.inlets.headOption.orNull)
- val outPortOpt = OptionVal(module.shape.outlets.headOption.orNull)
+ val inPortOpt = OptionVal(if (inlets.isEmpty) null else inlets.head)
+ val outPortOpt = OptionVal(if (outlets.isEmpty) null else outlets.head)
val wiring = if (outPortOpt.isDefined) wireBackward else noWire
@@ -779,6 +792,22 @@ import pekko.util.OptionVal
}
}
+
+ @inline @InternalApi private[pekko] def fromGraphStage(
+ graphStage: GraphStageWithMaterializedValue[_ <: Shape, _]):
LinearTraversalBuilder = {
+ val builder = graphStage.traversalBuilder
+ val attributes = builder.attributes
+ val linear = builder match {
+ case atomic: AtomicTraversalBuilder =>
+ LinearTraversalBuilder.fromModule(atomic.module, Attributes.none)
+ case _ =>
+ val builderWithoutAttributes =
+ if (attributes eq Attributes.none) builder else
builder.setAttributes(Attributes.none)
+ fromBuilder(builderWithoutAttributes, graphStage.shape, Keep.right)
+ }
+
+ if (attributes eq Attributes.none) linear else
linear.setAttributes(attributes)
+ }
}
/**
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
index 5cf2bdc827..7592733d35 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
@@ -17,6 +17,7 @@
package org.apache.pekko.stream.impl.fusing
+import scala.collection.immutable
import scala.concurrent.Future
import scala.util.{ Failure, Try }
@@ -25,7 +26,7 @@ import pekko.annotation.InternalApi
import pekko.stream.{ Attributes, FlowShape, Graph, Inlet, Outlet,
SourceShape, SubscriptionWithCancelException }
import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource,
JavaStreamSource, TraversalBuilder }
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource,
SingleSource }
import pekko.stream.scaladsl.Source
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler
}
import pekko.util.OptionVal
@@ -54,6 +55,35 @@ private[pekko] object FlattenConcat {
override def isClosed: Boolean = !hasNext
}
+ private final class InflightRangeSource[T](range: immutable.Range) extends
InflightSource[T] {
+ private val isEmptyRange = range.isEmpty
+ private val rangeLast = if (isEmptyRange) 0 else range.last
+ private val rangeStep = range.step
+ private var nextElement = range.start
+ private var closed = isEmptyRange
+
+ override def hasNext: Boolean = !closed
+ override def next(): T =
+ if (closed) throw new NoSuchElementException("next called after
completion")
+ else {
+ val current = nextElement
+ if (current == rangeLast) closed = true
+ else nextElement = current + rangeStep
+ current.asInstanceOf[T]
+ }
+ override def tryPull(): Unit = ()
+ override def cancel(cause: Throwable): Unit = ()
+ override def isClosed: Boolean = closed
+ }
+
+ private final class InflightRepeatSource[T](elem: T) extends
InflightSource[T] {
+ override def hasNext: Boolean = true
+ override def next(): T = elem
+ override def tryPull(): Unit = ()
+ override def cancel(cause: Throwable): Unit = ()
+ override def isClosed: Boolean = false
+ }
+
private final class InflightCompletedFutureSource[T](result: Try[T]) extends
InflightSource[T] {
private var _hasNext = result.isSuccess
override def hasNext: Boolean = _hasNext
@@ -219,6 +249,26 @@ private[pekko] final class FlattenConcat[T,
M](parallelism: Int)
}
}
+ private def addRangeSource(range: immutable.Range): Unit = {
+ val inflightSource = new InflightRangeSource[T](range)
+ if (isAvailable(out) && queue.isEmpty) {
+ if (inflightSource.hasNext) {
+ push(out, inflightSource.next())
+ if (inflightSource.hasNext)
+ queue.enqueue(inflightSource)
+ }
+ } else if (inflightSource.hasNext) {
+ queue.enqueue(inflightSource)
+ }
+ }
+
+ private def addRepeatSource(elem: T): Unit = {
+ val inflightSource = new InflightRepeatSource[T](elem)
+ if (isAvailable(out) && queue.isEmpty)
+ push(out, elem)
+ queue.enqueue(inflightSource)
+ }
+
private def addCompletedFutureElem(elem: Try[T]): Unit = {
if (isAvailable(out) && queue.isEmpty) {
elem match {
@@ -287,6 +337,9 @@ private[pekko] final class FlattenConcat[T, M](parallelism:
Int)
case None => addPendingFutureElem(future)
}
case iterable: IterableSource[T] @unchecked =>
addSourceElements(iterable.elements.iterator)
+ case iterator: IteratorSource[T] @unchecked =>
addSourceElements(iterator.createIterator())
+ case range: RangeSource[T] @unchecked =>
addRangeSource(range.range)
+ case repeat: RepeatSource[T] @unchecked =>
addRepeatSource(repeat.elem)
case javaStream: JavaStreamSource[T, _] @unchecked =>
import scala.jdk.CollectionConverters._
addSourceElements(javaStream.open().iterator.asScala)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
index b95b29f980..b901584ffe 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
@@ -58,6 +58,11 @@ import pekko.stream.stage._
*/
@InternalApi private[pekko] object GraphStages {
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] trait ValuePresentedSource
+
/**
* INTERNAL API
*/
@@ -277,7 +282,7 @@ import pekko.stream.stage._
override def toString: String = s"TickSource($initialDelay, $interval,
$tick)"
}
- final class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] {
+ final class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]]
with ValuePresentedSource {
override def initialAttributes: Attributes = DefaultAttributes.singleSource
ReactiveStreamsCompliance.requireNonNullElement(elem)
val out = Outlet[T]("single.out")
@@ -294,6 +299,21 @@ import pekko.stream.stage._
override def toString: String = "SingleSource"
}
+ final class RepeatSource[T](val elem: T) extends GraphStage[SourceShape[T]]
with ValuePresentedSource {
+ override def initialAttributes: Attributes = DefaultAttributes.repeat
+ ReactiveStreamsCompliance.requireNonNullElement(elem)
+ val out = Outlet[T]("repeat.out")
+ override val shape = SourceShape(out)
+ override def createLogic(attr: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with OutHandler {
+ override def onPull(): Unit = push(out, elem)
+
+ setHandler(out, this)
+ }
+
+ override def toString: String = "RepeatSource"
+ }
+
final class FutureFlattenSource[T, M](futureSource:
Future[Graph[SourceShape[T], M]])
extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
ReactiveStreamsCompliance.requireNonNullElement(futureSource)
@@ -383,7 +403,7 @@ import pekko.stream.stage._
override def toString: String = "FutureFlattenSource"
}
- final class FutureSource[T](val future: Future[T]) extends
GraphStage[SourceShape[T]] {
+ final class FutureSource[T](val future: Future[T]) extends
GraphStage[SourceShape[T]] with ValuePresentedSource {
ReactiveStreamsCompliance.requireNonNullElement(future)
val shape = SourceShape(Outlet[T]("FutureSource.out"))
val out = shape.out
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
index cfcd1c035f..31edf9eac7 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
@@ -26,10 +26,13 @@ import pekko.stream.{ Attributes, Outlet, SourceShape,
Supervision }
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.impl.ReactiveStreamsCompliance
import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
@InternalApi
-private[pekko] final class IterableSource[T](val elements:
immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
+private[pekko] final class IterableSource[T](val elements:
immutable.Iterable[T])
+ extends GraphStage[SourceShape[T]]
+ with ValuePresentedSource {
ReactiveStreamsCompliance.requireNonNullElement(elements)
override protected def initialAttributes: Attributes =
DefaultAttributes.iterableSource
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
similarity index 56%
copy from
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
copy to
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
index cfcd1c035f..7c6baa5114 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
@@ -17,24 +17,25 @@
package org.apache.pekko.stream.impl.fusing
-import scala.collection.immutable
import scala.util.control.NonFatal
import org.apache.pekko
import pekko.annotation.InternalApi
-import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
import pekko.stream.ActorAttributes.SupervisionStrategy
-import pekko.stream.impl.ReactiveStreamsCompliance
-import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
@InternalApi
-private[pekko] final class IterableSource[T](val elements:
immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
- ReactiveStreamsCompliance.requireNonNullElement(elements)
+private[pekko] final class IteratorSource[T](
+ val createIterator: () => Iterator[T],
+ defaultAttributes: Attributes)
+ extends GraphStage[SourceShape[T]]
+ with ValuePresentedSource {
- override protected def initialAttributes: Attributes =
DefaultAttributes.iterableSource
+ override protected def initialAttributes: Attributes = defaultAttributes
- private val out = Outlet[T]("IterableSource.out")
+ private val out = Outlet[T]("IteratorSource.out")
override val shape: SourceShape[T] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
@@ -45,27 +46,44 @@ private[pekko] final class IterableSource[T](val elements:
immutable.Iterable[T]
override def onPull(): Unit =
try {
if (currentIterator eq null) {
- currentIterator = elements.iterator
+ currentIterator = createIterator()
}
- tryPushNextOrComplete()
+ pushNextOrComplete()
} catch {
case NonFatal(ex) =>
- decider(ex) match {
- case Supervision.Stop => failStage(ex)
- case Supervision.Resume => tryPushNextOrComplete()
- case Supervision.Restart =>
- currentIterator = elements.iterator
- tryPushNextOrComplete()
+ if (currentIterator eq null) handleIteratorCreationFailure(ex)
+ else handleIteratorFailure(ex)
+ }
+
+ private def handleIteratorCreationFailure(ex: Throwable): Unit =
+ decider(ex) match {
+ case Supervision.Stop => failStage(ex)
+ case Supervision.Resume =>
+ completeStage()
+ case Supervision.Restart =>
+ try {
+ currentIterator = createIterator()
+ pushNextOrComplete()
+ } catch {
+ case NonFatal(restartEx) => failStage(restartEx)
}
}
- private def tryPushNextOrComplete(): Unit =
+ private def handleIteratorFailure(ex: Throwable): Unit =
+ decider(ex) match {
+ case Supervision.Stop => failStage(ex)
+ case Supervision.Resume => pushNextOrComplete()
+ case Supervision.Restart =>
+ currentIterator = createIterator()
+ pushNextOrComplete()
+ }
+
+ private def pushNextOrComplete(): Unit =
if (currentIterator.hasNext) {
if (isAvailable(out)) {
push(out, currentIterator.next())
- if (!currentIterator.hasNext) {
+ if (!currentIterator.hasNext)
completeStage()
- }
}
} else {
completeStage()
@@ -74,5 +92,5 @@ private[pekko] final class IterableSource[T](val elements:
immutable.Iterable[T]
setHandler(out, this)
}
- override def toString: String = "IterableSource"
+ override def toString: String = "IteratorSource"
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 6f7b1db1ae..a0747c2b99 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -45,7 +45,7 @@ import pekko.stream.impl.{
TraversalBuilder
}
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource,
SimpleLinearGraphStage, SingleSource }
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource,
SimpleLinearGraphStage, SingleSource }
import pekko.stream.scaladsl.{
DelayStrategy,
Source,
@@ -2195,6 +2195,12 @@ private[pekko] object TakeWithin {
}
case iterableSource: IterableSource[T @unchecked] =>
emitMultiple(out, iterableSource.elements, () =>
completeStage())
+ case iteratorSource: IteratorSource[T @unchecked] =>
+ emitMultiple(out, iteratorSource.createIterator(), () =>
completeStage())
+ case rangeSource: RangeSource[T @unchecked] =>
+ emitMultiple(out,
rangeSource.range.iterator.asInstanceOf[Iterator[T]], () => completeStage())
+ case repeatSource: RepeatSource[T @unchecked] =>
+ emitMultiple(out,
Iterator.continually(repeatSource.elem), () => completeStage())
case javaStreamSource: JavaStreamSource[T @unchecked, _] =>
emitMultiple(out, javaStreamSource.open().spliterator(),
() => completeStage())
case _ =>
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala
new file mode 100644
index 0000000000..4e9cad700e
--- /dev/null
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.collection.immutable
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.stream.impl.ReactiveStreamsCompliance
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
+
+@InternalApi
+private[pekko] final class RangeSource[T](val range: immutable.Range,
defaultAttributes: Attributes)
+ extends GraphStage[SourceShape[T]]
+ with ValuePresentedSource {
+ ReactiveStreamsCompliance.requireNonNullElement(range)
+
+ override protected def initialAttributes: Attributes = defaultAttributes
+
+ private val out = Outlet[T]("RangeSource.out")
+ override val shape: SourceShape[T] = SourceShape(out)
+ private[this] val isEmptyRange = range.isEmpty
+ private[this] val rangeStart = range.start
+ private[this] val rangeLast = if (isEmptyRange) 0 else range.last
+ private[this] val rangeStep = range.step
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with OutHandler {
+ private[this] var nextElement = rangeStart
+
+ override def preStart(): Unit =
+ if (isEmptyRange) completeStage()
+
+ override def onPull(): Unit = {
+ val current = nextElement
+ val isLast = current == rangeLast
+ if (!isLast)
+ nextElement = current + rangeStep
+
+ push(out, current.asInstanceOf[T])
+ if (isLast)
+ completeStage()
+ }
+
+ setHandler(out, this)
+ }
+
+ override def toString: String = "RangeSource"
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 10a634f582..522be44914 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -38,7 +38,7 @@ import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava
}
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
+import pekko.stream.impl.fusing.{ RangeSource, StatefulMapConcat,
ZipWithIndexJava }
import pekko.util._
import org.jspecify.annotations.Nullable
@@ -234,7 +234,9 @@ object Source {
* @see [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]]
*/
def range(start: Int, end: Int, step: Int): javadsl.Source[Integer, NotUsed]
=
- new Source(scaladsl.Source(Range.inclusive(start, end,
step).asInstanceOf[immutable.Iterable[Integer]]))
+ new Source(
+ scaladsl.Source.fromGraph(
+ new RangeSource[Integer](Range.inclusive(start, end, step),
DefaultAttributes.iterableSource)))
/**
* Elements are emitted periodically with the specified interval.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 4cb3070b21..1123b52a1b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -505,13 +505,7 @@ object Flow {
case f: Flow[I, O, M] => f
case f: javadsl.Flow[I, O, M] @unchecked => f.asScala
case g: GraphStageWithMaterializedValue[FlowShape[I, O], M] =>
- // move these from the operator itself to make the returned source
- // behave as it is the operator with regards to attributes
- val attrs = g.traversalBuilder.attributes
- val noAttrStage = g.withAttributes(Attributes.none)
- new Flow(
- LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder,
noAttrStage.shape, Keep.right),
- noAttrStage.shape).withAttributes(attrs)
+ new Flow(LinearTraversalBuilder.fromGraphStage(g), g.shape)
case _ => new
Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape,
Keep.right), g.shape)
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 1a49530dfe..8407ba5a0e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -150,13 +150,7 @@ object Sink {
case s: Sink[T, M] => s
case s: javadsl.Sink[T, M] @unchecked => s.asScala
case g: GraphStageWithMaterializedValue[SinkShape[T], M] =>
- // move these from the stage itself to make the returned source
- // behave as it is the stage with regards to attributes
- val attrs = g.traversalBuilder.attributes
- val noAttrStage = g.withAttributes(Attributes.none)
- new Sink(
- LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder,
noAttrStage.shape, Keep.right),
- noAttrStage.shape).withAttributes(attrs)
+ new Sink(LinearTraversalBuilder.fromGraphStage(g), g.shape)
case other =>
new Sink(LinearTraversalBuilder.fromBuilder(other.traversalBuilder,
other.shape, Keep.right), other.shape)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 739382bf7e..6f60be9f56 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -30,7 +30,15 @@ import pekko.annotation.InternalApi
import pekko.stream._
import pekko.stream.impl._
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ ArraySource, GraphStages, IterableSource,
LazyFutureSource, LazySingleSource }
+import pekko.stream.impl.fusing.{
+ ArraySource,
+ GraphStages,
+ IterableSource,
+ IteratorSource,
+ LazyFutureSource,
+ LazySingleSource,
+ RangeSource
+}
import pekko.stream.impl.fusing.GraphStages._
import pekko.stream.stage.GraphStageWithMaterializedValue
import pekko.util.ConstantFun
@@ -286,11 +294,8 @@ object Source {
* Elements are pulled out of the iterator in accordance with the demand
coming
* from the downstream transformation steps.
*/
- def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed] =
- apply(new immutable.Iterable[T] {
- override def iterator: Iterator[T] = f()
- override def toString: String = "() => Iterator"
- })
+ @inline def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed] =
+ fromGraphStage(new IteratorSource[T](f, DefaultAttributes.iterableSource))
/**
* Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream
iterator to get all its
@@ -309,11 +314,11 @@ object Source {
* Starts a new 'cycled' `Source` from the given elements. The producer
stream of elements
* will continue infinitely by repeating the sequence of elements provided
by function parameter.
*/
- def cycle[T](f: () => Iterator[T]): Source[T, NotUsed] = {
+ @inline def cycle[T](f: () => Iterator[T]): Source[T, NotUsed] = {
val iterator = Iterator.continually {
val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty
iterator") else i
}.flatten
- fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource)
+ fromGraphStage(new IteratorSource[T](() => iterator,
DefaultAttributes.cycledSource))
}
/**
@@ -372,26 +377,25 @@ object Source {
def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {
case s: Source[T, M] => s
case s: javadsl.Source[T, M] @unchecked => s.asScala
- case g: GraphStageWithMaterializedValue[SourceShape[T], M] =>
- // move these from the stage itself to make the returned source
- // behave as it is the stage with regards to attributes
- val attrs = g.traversalBuilder.attributes
- val noAttrStage = g.withAttributes(Attributes.none)
- new Source(
- LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder,
noAttrStage.shape, Keep.right),
- noAttrStage.shape).withAttributes(attrs)
- case other =>
+ case g: GraphStageWithMaterializedValue[SourceShape[T], M] =>
fromGraphStage(g)
+ case other =>
// composite source shaped graph
new Source(LinearTraversalBuilder.fromBuilder(other.traversalBuilder,
other.shape, Keep.right), other.shape)
}
+ @inline private def fromGraphStage[T, M](g:
GraphStageWithMaterializedValue[SourceShape[T], M]): Source[T, M] =
+ new Source(LinearTraversalBuilder.fromGraphStage(g), g.shape)
+
+ @inline private def fromRange[T](range: immutable.Range): Source[T, NotUsed]
=
+ fromGraphStage(new RangeSource[T](range, DefaultAttributes.iterableSource))
+
/**
* Defers the creation of a [[Source]] until materialization. The `factory`
function
* exposes [[Materializer]] which is going to be used during materialization
and
* [[Attributes]] of the [[Source]] returned by this method.
*/
def fromMaterializer[T, M](factory: (Materializer, Attributes) => Source[T,
M]): Source[T, Future[M]] =
- Source.fromGraph(new SetupSourceStage(factory))
+ fromGraphStage(new SetupSourceStage(factory))
/**
* Helper to create [[Source]] from `Iterable`.
@@ -403,13 +407,16 @@ object Source {
* beginning) regardless of when they subscribed.
* @see [[apply(immutable.Seq)]]
*/
- def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
+ @inline def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
// unknown size is -1
(iterable.knownSize: @switch) match {
case 0 => empty
case 1 => single(iterable.head)
case _ =>
- fromGraph(new
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+ iterable match {
+ case range: immutable.Range => fromRange[T](range)
+ case _ => fromGraphStage(new
IterableSource[T](iterable))
+ }
}
}
@@ -424,12 +431,16 @@ object Source {
* @see [[apply(immutable.Iterable)]]
* @since 2.0.0
*/
- def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = {
- seq match {
- case immutable.Seq() => empty[T]
- case immutable.Seq(elem: T @unchecked) => single(elem)
- case _ =>
- fromGraph(new
IterableSource[T](seq)).withAttributes(DefaultAttributes.iterableSource)
+ @inline def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = {
+ // unknown size is -1
+ (seq.knownSize: @switch) match {
+ case 0 => empty[T]
+ case 1 => single(seq.head)
+ case _ =>
+ seq match {
+ case range: immutable.Range => fromRange[T](range)
+ case _ => fromGraphStage(new
IterableSource[T](seq))
+ }
}
}
@@ -439,13 +450,13 @@ object Source {
*
* @since 1.3.0
*/
- def apply[T](array: Array[T]): Source[T, NotUsed] = {
+ @inline def apply[T](array: Array[T]): Source[T, NotUsed] = {
if (array.length == 0)
empty
else if (array.length == 1)
single(array(0))
else
- Source.fromGraph(new ArraySource[T](array))
+ fromGraphStage(new ArraySource[T](array))
}
/**
@@ -456,14 +467,14 @@ object Source {
* receive new tick elements as soon as it has requested more elements.
*/
def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick:
T): Source[T, Cancellable] =
- fromGraph(new TickSource[T](initialDelay, interval, tick))
+ fromGraphStage(new TickSource[T](initialDelay, interval, tick))
/**
* Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream
consisting of one element.
*/
- def single[T](element: T): Source[T, NotUsed] =
- fromGraph(new GraphStages.SingleSource(element))
+ @inline def single[T](element: T): Source[T, NotUsed] =
+ fromGraphStage(new GraphStages.SingleSource(element))
/**
* Create a `Source` from the given elements.
@@ -496,8 +507,8 @@ object Source {
/**
* Create a `Source` that will continually emit the given element.
*/
- def repeat[T](element: T): Source[T, NotUsed] = {
- fromIterator(() =>
Iterator.continually(element)).withAttributes(DefaultAttributes.repeat)
+ @inline def repeat[T](element: T): Source[T, NotUsed] = {
+ fromGraphStage(new GraphStages.RepeatSource(element))
}
/**
@@ -514,7 +525,7 @@ object Source {
* }}}
*/
def unfold[S, E](s: S)(f: S => Option[(S, E)]): Source[E, NotUsed] =
- Source.fromGraph(new Unfold(s, f))
+ fromGraphStage(new Unfold(s, f))
/**
* Same as [[unfold]], but uses an async function to generate the next
state-element tuple.
@@ -532,7 +543,7 @@ object Source {
* }}}
*/
def unfoldAsync[S, E](s: S)(f: S => Future[Option[(S, E)]]): Source[E,
NotUsed] =
- Source.fromGraph(new UnfoldAsync(s, f))
+ fromGraphStage(new UnfoldAsync(s, f))
/**
* Creates a sequential `Source` by iterating with the given predicate and
function,
@@ -543,27 +554,29 @@ object Source {
* @since 1.1.0
*/
def iterate[T](seed: T)(p: T => Boolean, f: T => T): Source[T, NotUsed] =
- fromIterator(() =>
- new AbstractIterator[T] {
- private var first = true
- private var acc = seed
- override def hasNext: Boolean = p(acc)
- override def next(): T = {
- if (first) {
- first = false
- } else {
- acc = f(acc)
+ fromGraphStage(new IteratorSource[T](
+ () =>
+ new AbstractIterator[T] {
+ private var first = true
+ private var acc = seed
+ override def hasNext: Boolean = p(acc)
+ override def next(): T = {
+ if (first) {
+ first = false
+ } else {
+ acc = f(acc)
+ }
+ acc
}
- acc
- }
- }).withAttributes(DefaultAttributes.iterateSource)
+ },
+ DefaultAttributes.iterateSource))
/**
* A `Source` with no elements, i.e. an empty stream that is completed
immediately for every connected `Sink`.
*/
def empty[T]: Source[T, NotUsed] = _empty
private[this] val _empty: Source[Nothing, NotUsed] =
- Source.fromGraph(EmptySource)
+ fromGraphStage(EmptySource)
/**
* Create a `Source` which materializes a [[scala.concurrent.Promise]] which
controls what element
@@ -577,20 +590,22 @@ object Source {
* with None.
*/
def maybe[T]: Source[T, Promise[Option[T]]] =
- Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T],
Promise[Option[T]]]])
+ fromGraphStage(
+ MaybeSource.asInstanceOf[GraphStageWithMaterializedValue[SourceShape[T],
Promise[Option[T]]]])
/**
* Create a `Source` that immediately ends the stream with the `cause` error
to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T, NotUsed] =
- Source.fromGraph(new FailedSource[T](cause))
+ fromGraphStage(new FailedSource[T](cause))
/**
* Emits a single value when the given `Future` is successfully completed
and then completes the stream.
* The stream fails if the `Future` is completed with a failure.
*/
+ @inline
def future[T](futureElement: Future[T]): Source[T, NotUsed] =
futureElement.value match {
- case None => fromGraph(new
FutureSource[T](futureElement))
+ case None => fromGraphStage(new
FutureSource[T](futureElement))
case Some(scala.util.Success(null)) => empty[T]
case Some(scala.util.Success(elem)) => single(elem)
case Some(scala.util.Failure(ex)) => failed[T](ex)
@@ -601,7 +616,7 @@ object Source {
* This stream could be useful in tests.
*/
def never[T]: Source[T, NotUsed] = _never
- private[this] val _never: Source[Nothing, NotUsed] =
fromGraph(GraphStages.NeverSource)
+ private[this] val _never: Source[Nothing, NotUsed] =
fromGraphStage(GraphStages.NeverSource)
/**
* Emits a single value when the given `CompletionStage` is successfully
completed and then completes the stream.
@@ -616,8 +631,9 @@ object Source {
* Turn a `Future[Source]` into a source that will emit the values of the
source when the future completes successfully.
* If the `Future` is completed with a failure the stream is failed.
*/
+ @inline
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T,
Future[M]] = futureSource.value match {
- case None => fromGraph(new
FutureFlattenSource(futureSource))
+ case None => fromGraphStage(new
FutureFlattenSource(futureSource))
case Some(scala.util.Success(null)) =>
val exception = new NullPointerException("futureSource completed with
null")
Source.failed(exception).mapMaterializedValue(_ =>
Future.failed[M](exception))
@@ -634,7 +650,7 @@ object Source {
* the laziness and will trigger the factory immediately.
*/
def lazySingle[T](create: () => T): Source[T, NotUsed] =
- fromGraph(new LazySingleSource(create))
+ fromGraphStage(new LazySingleSource(create))
/**
* Defers invoking the `create` function to create a future element until
there is downstream demand.
@@ -646,7 +662,7 @@ object Source {
* the laziness and will trigger the factory immediately.
*/
def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed] =
- fromGraph(new LazyFutureSource(create))
+ fromGraphStage(new LazyFutureSource(create))
/**
* Defers invoking the `create` function to create a future source until
there is downstream demand.
@@ -665,7 +681,7 @@ object Source {
* is failed with a [[pekko.stream.NeverMaterializedException]]
*/
def lazySource[T, M](create: () => Source[T, M]): Source[T, Future[M]] =
- fromGraph(new LazySource(create))
+ fromGraphStage(new LazySource(create))
/**
* Defers invoking the `create` function to create a future source until
there is downstream demand.
@@ -738,9 +754,7 @@ object Source {
overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
require(!overflowStrategy.isBackpressure, "Backpressure overflowStrategy
not supported")
- Source
- .fromGraph(new ActorRefSource(bufferSize, overflowStrategy,
completionMatcher, failureMatcher))
- .withAttributes(DefaultAttributes.actorRefSource)
+ fromGraphStage(new ActorRefSource(bufferSize, overflowStrategy,
completionMatcher, failureMatcher))
}
/**
@@ -751,7 +765,7 @@ object Source {
ackMessage: Any,
completionMatcher: PartialFunction[Any, CompletionStrategy],
failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = {
- Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage,
completionMatcher, failureMatcher))
+ fromGraphStage(new ActorRefBackpressureSource(ackTo, ackMessage,
completionMatcher, failureMatcher))
}
/**
@@ -772,7 +786,7 @@ object Source {
ackMessage: Any,
completionMatcher: PartialFunction[Any, CompletionStrategy],
failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = {
- Source.fromGraph(new ActorRefBackpressureSource(None, ackMessage,
completionMatcher, failureMatcher))
+ fromGraphStage(new ActorRefBackpressureSource(None, ackMessage,
completionMatcher, failureMatcher))
}
/**
@@ -910,7 +924,7 @@ object Source {
* @param bufferSize size of the buffer in number of elements
*/
def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] =
- Source.fromGraph(new BoundedSourceQueueStage[T](bufferSize))
+ fromGraphStage(new BoundedSourceQueueStage[T](bufferSize))
/**
* Creates a Source that will immediately execute the provided function
`producer` with a [[BoundedSourceQueue]] when materialized.
@@ -1029,8 +1043,7 @@ object Source {
bufferSize: Int,
overflowStrategy: OverflowStrategy,
maxConcurrentOffers: Int): Source[T, SourceQueueWithComplete[T]] =
- Source.fromGraph(
- new QueueSource(bufferSize, overflowStrategy,
maxConcurrentOffers).withAttributes(DefaultAttributes.queueSource))
+ fromGraphStage(new QueueSource(bufferSize, overflowStrategy,
maxConcurrentOffers))
/**
* Start a new `Source` from some resource which can be opened, read and
closed.
@@ -1063,7 +1076,7 @@ object Source {
* @tparam R - the resource type.
*/
def unfoldResource[T, R](create: () => R, read: (R) => Option[T], close: (R)
=> Unit): Source[T, NotUsed] =
- Source.fromGraph(new UnfoldResourceSource(create, read, close))
+ fromGraphStage(new UnfoldResourceSource(create, read, close))
/**
* Start a new `Source` from some resource which can be opened, read and
closed.
@@ -1091,7 +1104,7 @@ object Source {
create: () => Future[R],
read: (R) => Future[Option[T]],
close: (R) => Future[Done]): Source[T, NotUsed] =
- Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))
+ fromGraphStage(new UnfoldResourceSourceAsync(create, read, close))
/**
* Merge multiple [[Source]]s. Prefer the sources depending on the
'priority' parameters.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]