This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e25aa89a8 perf(stream): reduce source construction overhead (#2914)
1e25aa89a8 is described below

commit 1e25aa89a81fdfec5caaff6ba038a4b82e8dfb1c
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 10 11:59:36 2026 +0800

    perf(stream): reduce source construction overhead (#2914)
    
    * perf(stream): reduce source construction overhead
    
    * fix(stream): keep future source operators indexed
    
    * fix(stream): address source fast path review feedback
    
    Motivation:
    
    PR #2914 review found two correctness gaps in the new source stages and one 
optimization opportunity in value-presented source detection.
    
    Modification:
    
    Add a value-presented source marker trait, enforce null-element compliance 
for repeat sources, and handle iterator-factory supervision failures without 
null iterator access.
    
    Result:
    
    Value-presented source detection is simpler, Source.repeat rejects null at 
construction time, and iterator-factory resume/restart paths are deterministic.
    
    Tests:
    
    - scalafmt --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 / passed
    
    - scalafmt --list --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 
/ passed
    
    - git diff --check / passed
    
    - sbt "stream / Test / compile" "stream-tests / Test / testOnly 
org.apache.pekko.stream.scaladsl.SourceSpec 
org.apache.pekko.stream.impl.TraversalBuilderSpec" / passed
    
    - sbt "stream / mimaReportBinaryIssues" / passed
    
    References:
    
    Refs #2914
    
    Co-authored-by: Copilot <[email protected]>
    
    * perf(stream): tune source fast paths for JIT
    
    Motivation:
    
    Source fast paths should avoid unnecessary per-materialization state and 
immediate-path virtual calls in hot stream operators.
    
    Modification:
    
    Hoist immutable range-derived values from RangeSource logic instances to 
the stage, and push repeat source elements directly in FlattenConcat's 
immediate fast path.
    
    Result:
    
    Range materialization keeps per-run state smaller and repeat fast-path 
emission avoids an unnecessary method dispatch without changing stream behavior.
    
    Tests:
    
    - scalafmt --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 / passed
    
    - scalafmt --list --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 
/ passed
    
    - git diff --check / passed
    
    - sbt "stream / Test / compile" "stream-tests / Test / testOnly 
org.apache.pekko.stream.scaladsl.SourceSpec 
org.apache.pekko.stream.impl.TraversalBuilderSpec" / passed
    
    - sbt "stream / mimaReportBinaryIssues" / passed
    
    - JDK 21.0.5 sbt "bench-jmh/Jmh/run -wi 1 -i 1 -f1 -t1 -prof gc 
.*GraphStageConstructionBenchmark.*" / passed
    
    - JDK 21.0.5 sbt "bench-jmh/Jmh/run -wi 1 -i 1 -f1 -t1 
.*RangeSourceBenchmark.*" / passed
    
    References:
    
    Refs #2914
    
    Co-authored-by: Copilot <[email protected]>
    
    * perf(stream): trim traversal builder construction overhead
    
    Motivation:
    
    Graph-stage source, sink, and flow construction all pass through 
LinearTraversalBuilder.fromModule, so small temporary allocations in that path 
affect constructor-heavy workloads.
    
    Modification:
    
    Cache shape ports in local values and avoid headOption allocation when 
creating OptionVal wrappers for single-port linear modules.
    
    Result:
    
    Constructor fast paths do less repeated port lookup work while preserving 
the same traversal wiring semantics.
    
    Tests:
    
    - scalafmt --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 / passed
    
    - scalafmt --list --mode diff-ref=a57a0db001ec3133b51c8c884dd52bfe35673279 
/ passed
    
    - git diff --check / passed
    
    - sbt "stream / Test / compile" "stream-tests / Test / testOnly 
org.apache.pekko.stream.scaladsl.SourceSpec 
org.apache.pekko.stream.impl.TraversalBuilderSpec" / passed
    
    - sbt "stream / mimaReportBinaryIssues" / passed
    
    - JDK 21.0.5 sbt "bench-jmh/Jmh/run -wi 1 -i 1 -f1 -t1 -prof gc 
.*GraphStageConstructionBenchmark.*" / passed
    
    - JDK 21.0.5 sbt "bench-jmh/Jmh/run -wi 1 -i 1 -f1 -t1 
.*RangeSourceBenchmark.*" / passed
    
    References:
    
    Refs #2914
    
    Co-authored-by: Copilot <[email protected]>
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../stream/GraphStageConstructionBenchmark.scala   | 180 +++++++++++++++++++++
 .../apache/pekko/stream/RangeSourceBenchmark.scala | 103 ++++++++++++
 .../apache/pekko/stream/javadsl/SourceTest.java    |  15 ++
 .../pekko/stream/impl/TraversalBuilderSpec.scala   |  39 ++++-
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 103 ++++++++++++
 .../apache/pekko/stream/impl/ActorRefSource.scala  |   2 +
 .../apache/pekko/stream/impl/FailedSource.scala    |   5 +-
 .../pekko/stream/impl/JavaStreamSource.scala       |   4 +-
 .../org/apache/pekko/stream/impl/QueueSource.scala |   2 +
 .../pekko/stream/impl/TraversalBuilder.scala       |  97 +++++++----
 .../pekko/stream/impl/fusing/FlattenConcat.scala   |  55 ++++++-
 .../pekko/stream/impl/fusing/GraphStages.scala     |  24 ++-
 .../pekko/stream/impl/fusing/IterableSource.scala  |   5 +-
 .../{IterableSource.scala => IteratorSource.scala} |  58 ++++---
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  |   8 +-
 .../pekko/stream/impl/fusing/RangeSource.scala     |  66 ++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   |   6 +-
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |   8 +-
 .../org/apache/pekko/stream/scaladsl/Sink.scala    |   8 +-
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 147 +++++++++--------
 20 files changed, 789 insertions(+), 146 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
new file mode 100644
index 0000000000..fb2aa9a963
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.immutable
+import scala.concurrent.Promise
+
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.impl.LinearTraversalBuilder
+import org.apache.pekko.stream.impl.Stages.DefaultAttributes
+import org.apache.pekko.stream.impl.fusing.GraphStages
+import org.apache.pekko.stream.impl.fusing.IterableSource
+import org.apache.pekko.stream.scaladsl.Flow
+import org.apache.pekko.stream.scaladsl.Keep
+import org.apache.pekko.stream.scaladsl.Sink
+import org.apache.pekko.stream.scaladsl.Source
+import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue
+import org.openjdk.jmh.annotations.Benchmark
+import org.openjdk.jmh.annotations.BenchmarkMode
+import org.openjdk.jmh.annotations.Mode
+import org.openjdk.jmh.annotations.OutputTimeUnit
+import org.openjdk.jmh.annotations.Scope
+import org.openjdk.jmh.annotations.State
+import org.openjdk.jmh.infra.Blackhole
+
+@State(Scope.Benchmark)
+@BenchmarkMode(Array(Mode.Throughput))
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+class GraphStageConstructionBenchmark {
+  private val element = "element"
+  private val elements: immutable.Iterable[String] = Vector(element, element)
+  private val range = 1 to 1000
+  private val pendingFuture = Promise[String]().future
+  private val flowStage = GraphStages.identity[Any]
+  private val sinkStage = GraphStages.IgnoreSink
+
+  private def oldSourceFromGraphStage[T, M](
+      stage: GraphStageWithMaterializedValue[SourceShape[T], M]): Source[T, M] 
= {
+    val attributes = stage.traversalBuilder.attributes
+    val noAttributeStage = stage.withAttributes(Attributes.none)
+    new Source(
+      LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, 
noAttributeStage.shape, Keep.right),
+      noAttributeStage.shape).withAttributes(attributes)
+  }
+
+  private def oldSourceFromIterable[T](iterable: immutable.Iterable[T]): 
Source[T, NotUsed] =
+    (iterable.knownSize: @scala.annotation.switch) match {
+      case 0 => Source.empty
+      case 1 => oldSourceFromGraphStage(new 
GraphStages.SingleSource(iterable.head))
+      case _ =>
+        oldSourceFromGraphStage(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+    }
+
+  @Benchmark
+  def sourceSingle(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.single(element))
+
+  @Benchmark
+  def javadslSourceSingle(blackhole: Blackhole): Unit =
+    blackhole.consume(javadsl.Source.single(element))
+
+  @Benchmark
+  def sourceRepeat(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.repeat(element))
+
+  @Benchmark
+  def oldSourceRepeatPath(blackhole: Blackhole): Unit = {
+    val iterable = new immutable.Iterable[String] {
+      override def iterator: Iterator[String] = Iterator.continually(element)
+      override def toString: String = "() => Iterator"
+    }
+    blackhole.consume(
+      oldSourceFromGraphStage(new 
IterableSource[String](iterable)).withAttributes(DefaultAttributes.repeat))
+  }
+
+  @Benchmark
+  def sourceFromIterator(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.fromIterator(() => Iterator.single(element)))
+
+  @Benchmark
+  def oldSourceFromIteratorPath(blackhole: Blackhole): Unit = {
+    val iterable = new immutable.Iterable[String] {
+      override def iterator: Iterator[String] = Iterator.single(element)
+      override def toString: String = "() => Iterator"
+    }
+    blackhole.consume(
+      oldSourceFromGraphStage(new 
IterableSource[String](iterable)).withAttributes(DefaultAttributes.iterableSource))
+  }
+
+  @Benchmark
+  def sourceIterable(blackhole: Blackhole): Unit =
+    blackhole.consume(Source(elements))
+
+  @Benchmark
+  def oldSourceIterablePath(blackhole: Blackhole): Unit =
+    blackhole.consume(oldSourceFromIterable(elements))
+
+  @Benchmark
+  def sourceRange(blackhole: Blackhole): Unit =
+    blackhole.consume(Source(range))
+
+  @Benchmark
+  def oldSourceRangePath(blackhole: Blackhole): Unit =
+    blackhole.consume(
+      oldSourceFromGraphStage(new 
IterableSource[Int](range)).withAttributes(DefaultAttributes.iterableSource))
+
+  @Benchmark
+  def javadslSourceRange(blackhole: Blackhole): Unit =
+    blackhole.consume(javadsl.Source.range(1, 1000))
+
+  @Benchmark
+  def oldJavadslSourceRangePath(blackhole: Blackhole): Unit =
+    blackhole.consume(
+      new javadsl.Source(
+        oldSourceFromGraphStage(new 
IterableSource[Integer](range.asInstanceOf[immutable.Iterable[Integer]]))
+          .withAttributes(DefaultAttributes.iterableSource)))
+
+  @Benchmark
+  def sourceFuturePending(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.future(pendingFuture))
+
+  @Benchmark
+  def oldSourceFuturePendingPath(blackhole: Blackhole): Unit =
+    blackhole.consume(oldSourceFromGraphStage(new 
GraphStages.FutureSource[String](pendingFuture)))
+
+  @Benchmark
+  def sourceFromGraphStage(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.fromGraph(new GraphStages.SingleSource(element)))
+
+  @Benchmark
+  def oldSourceFromGraphStagePath(blackhole: Blackhole): Unit = {
+    val stage = new GraphStages.SingleSource(element)
+    blackhole.consume(oldSourceFromGraphStage(stage))
+  }
+
+  @Benchmark
+  def sinkFromGraphStage(blackhole: Blackhole): Unit =
+    blackhole.consume(Sink.fromGraph(sinkStage))
+
+  @Benchmark
+  def oldSinkFromGraphStagePath(blackhole: Blackhole): Unit = {
+    val attributes = sinkStage.traversalBuilder.attributes
+    val noAttributeStage = sinkStage.withAttributes(Attributes.none)
+    blackhole.consume(
+      new Sink(
+        LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, 
noAttributeStage.shape, Keep.right),
+        noAttributeStage.shape).withAttributes(attributes))
+  }
+
+  @Benchmark
+  def flowFromGraphStage(blackhole: Blackhole): Unit =
+    blackhole.consume(Flow.fromGraph(flowStage))
+
+  @Benchmark
+  def oldFlowFromGraphStagePath(blackhole: Blackhole): Unit = {
+    val attributes = flowStage.traversalBuilder.attributes
+    val noAttributeStage = flowStage.withAttributes(Attributes.none)
+    blackhole.consume(
+      new Flow(
+        LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, 
noAttributeStage.shape, Keep.right),
+        noAttributeStage.shape).withAttributes(attributes))
+  }
+}
diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala
new file mode 100644
index 0000000000..59925f9612
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.openjdk.jmh.annotations._
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.IterableSource
+import pekko.stream.scaladsl.{ Keep, RunnableGraph, Sink, Source }
+import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, 
InHandler }
+
+object RangeSourceBenchmark {
+  @volatile var sinkSum: Int = 0
+}
+
+final class IntCompletionLatch extends 
GraphStageWithMaterializedValue[SinkShape[Int], CountDownLatch] {
+  val in: Inlet[Int] = Inlet[Int]("IntCompletionLatch.in")
+  override val shape: SinkShape[Int] = SinkShape(in)
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, CountDownLatch) = {
+    val latch = new CountDownLatch(1)
+    val logic = new GraphStageLogic(shape) with InHandler {
+      private[this] var sum = 0
+
+      override def preStart(): Unit = pull(in)
+      override def onPush(): Unit = {
+        sum += grab(in)
+        pull(in)
+      }
+
+      override def onUpstreamFinish(): Unit = {
+        RangeSourceBenchmark.sinkSum = sum
+        latch.countDown()
+        completeStage()
+      }
+
+      setHandler(in, this)
+    }
+    (logic, latch)
+  }
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class RangeSourceBenchmark {
+  implicit val system: ActorSystem = ActorSystem("RangeSourceBenchmark")
+
+  @Param(Array("1", "1000"))
+  var elements: Int = 0
+
+  var rangeToLatch: RunnableGraph[CountDownLatch] = _
+  var oldRangeToLatch: RunnableGraph[CountDownLatch] = _
+
+  @Setup
+  def setup(): Unit = {
+    val range = 1 to elements
+    rangeToLatch = Source(range).toMat(Sink.fromGraph(new 
IntCompletionLatch))(Keep.right)
+    oldRangeToLatch = Source
+      .fromGraph(new IterableSource[Int](range))
+      .withAttributes(DefaultAttributes.iterableSource)
+      .toMat(Sink.fromGraph(new IntCompletionLatch))(Keep.right)
+  }
+
+  @TearDown
+  def shutdown(): Unit = {
+    Await.result(system.terminate(), 5.seconds)
+  }
+
+  @Benchmark
+  def sourceRangeToLatch(): Unit =
+    await(rangeToLatch.run())
+
+  @Benchmark
+  def oldSourceRangeToLatch(): Unit =
+    await(oldRangeToLatch.run())
+
+  private def await(latch: CountDownLatch): Unit =
+    if (!latch.await(5, TimeUnit.SECONDS))
+      throw new RuntimeException("Latch timed out")
+}
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 3aec7545b0..db3341baea 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -747,6 +747,21 @@ public class SourceTest extends StreamTestJupiter {
     }
   }
 
+  @Test
+  public void mustWorkFromRangeWithNegativeStep() throws Exception {
+    CompletionStage<List<Integer>> f =
+        Source.range(5, 1, -2).grouped(20).runWith(Sink.head(), system);
+    final List<Integer> result = f.toCompletableFuture().get(3, 
TimeUnit.SECONDS);
+    assertEquals(Arrays.asList(5, 3, 1), result);
+  }
+
+  @Test
+  public void mustWorkFromEmptyRange() throws Exception {
+    CompletionStage<List<Integer>> f = Source.range(1, 5, 
-1).runWith(Sink.seq(), system);
+    final List<Integer> result = f.toCompletableFuture().get(3, 
TimeUnit.SECONDS);
+    assertEquals(0, result.size());
+  }
+
   @Test
   public void mustRepeat() throws Exception {
     final CompletionStage<List<Integer>> f =
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
index 2af4b9b0a7..153339aeef 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
@@ -19,8 +19,9 @@ import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream._
 import pekko.stream.impl.TraversalTestUtils._
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
-import pekko.stream.impl.fusing.IterableSource
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
+import pekko.stream.impl.fusing.{ IterableSource, IteratorSource, RangeSource }
 import pekko.stream.scaladsl.{ Keep, Source }
 import pekko.testkit.PekkoSpec
 import pekko.util.OptionVal
@@ -547,6 +548,40 @@ class TraversalBuilderSpec extends PekkoSpec {
       OptionVal.None)
   }
 
+  "find Source.fromIterator via TraversalBuilder with getValuePresentedSource" 
in {
+    val createIterator = () => Iterator("a", "b", "c")
+    
TraversalBuilder.getValuePresentedSource(Source.fromIterator(createIterator)).get.asInstanceOf[IteratorSource[
+      String]].createIterator should ===(
+      createIterator)
+    val iteratorSource = new IteratorSource(createIterator, 
DefaultAttributes.iterableSource)
+    TraversalBuilder.getValuePresentedSource(iteratorSource) should 
be(OptionVal.Some(iteratorSource))
+
+    
TraversalBuilder.getValuePresentedSource(Source.fromIterator(createIterator).async)
 should be(OptionVal.None)
+    TraversalBuilder.getValuePresentedSource(
+      Source.fromIterator(createIterator).mapMaterializedValue(_ => "Mat")) 
should be(OptionVal.None)
+  }
+
+  "find Source.range via TraversalBuilder with getValuePresentedSource" in {
+    val range = 1 to 4
+    
TraversalBuilder.getValuePresentedSource(Source(range)).get.asInstanceOf[RangeSource[Int]].range
 should ===(range)
+    val rangeSource = new RangeSource[Int](range, 
DefaultAttributes.iterableSource)
+    TraversalBuilder.getValuePresentedSource(rangeSource) should 
be(OptionVal.Some(rangeSource))
+
+    TraversalBuilder.getValuePresentedSource(Source(range).async) should 
be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source(range).mapMaterializedValue(_ 
=> "Mat")) should be(OptionVal.None)
+  }
+
+  "find Source.repeat via TraversalBuilder with getValuePresentedSource" in {
+    
TraversalBuilder.getValuePresentedSource(Source.repeat("a")).get.asInstanceOf[RepeatSource[String]].elem
 should ===(
+      "a")
+    val repeatSource = new RepeatSource("a")
+    TraversalBuilder.getValuePresentedSource(repeatSource) should 
be(OptionVal.Some(repeatSource))
+
+    TraversalBuilder.getValuePresentedSource(Source.repeat("c").async) should 
be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source.repeat("d").mapMaterializedValue(_
 => "Mat")) should be(
+      OptionVal.None)
+  }
+
   "find Source.javaStreamSource via TraversalBuilder with 
getValuePresentedSource" in {
     val javaStream = java.util.stream.Stream.empty[String]()
     TraversalBuilder.getValuePresentedSource(Source.fromJavaStream(() => 
javaStream)).get
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 22d3b55077..34b0633b66 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -358,6 +358,11 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
       f.futureValue.toSet should ===(Set(42))
     }
 
+    "reject null element at construction time" in {
+      val error = the[NullPointerException] thrownBy Source.repeat(null)
+      error.getMessage should ===("Element must not be null, rule 2.13")
+    }
+
     "repeat example" in {
       // #repeat
       val source: Source[Int, NotUsed] = Source.repeat(42)
@@ -369,6 +374,52 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
       // #repeat
       f.futureValue shouldBe Done
     }
+
+    "work when flattened through value-presented source fast path" in {
+      Source
+        .single("repeat")
+        .flatMapConcat(_ => Source.repeat(42))
+        .take(3)
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(42, 42, 42))
+    }
+
+    "work when recovered through value-presented source fast path" in {
+      Source
+        .failed[Int](TE("boom"))
+        .recoverWithRetries(1, { case _ => Source.repeat(42) })
+        .take(3)
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(42, 42, 42))
+    }
+  }
+
+  "Range Source" must {
+    "emit inclusive and exclusive ranges" in {
+      Source(1 to 4).runWith(Sink.seq).futureValue should ===(immutable.Seq(1, 
2, 3, 4))
+      Source(1 until 4).runWith(Sink.seq).futureValue should 
===(immutable.Seq(1, 2, 3))
+    }
+
+    "emit stepped and empty ranges" in {
+      Source(5 to 1 by -2).runWith(Sink.seq).futureValue should 
===(immutable.Seq(5, 3, 1))
+      Source(1 to 5 by -1).runWith(Sink.seq).futureValue should 
===(immutable.Seq.empty)
+    }
+
+    "work when flattened through value-presented source fast path" in {
+      Source
+        .single("range")
+        .flatMapConcat(_ => Source(1 to 3))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "work when recovered through value-presented source fast path" in {
+      Source
+        .failed[Int](TE("boom"))
+        .recoverWithRetries(1, { case _ => Source(1 to 3) })
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
   }
 
   "Unfold Source" must {
@@ -437,6 +488,22 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         immutable.Seq(false, true, false, true, false, true, false, true, 
false, true))
     }
 
+    "work when flattened through value-presented source fast path" in {
+      Source
+        .single("iterator")
+        .flatMapConcat(_ => Source.fromIterator(() => Iterator(1, 2, 3)))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "work when recovered through value-presented source fast path" in {
+      Source
+        .failed[Int](TE("boom"))
+        .recoverWithRetries(1, { case _ => Source.fromIterator(() => 
Iterator(1, 2, 3)) })
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
     "fail stream when iterator throws" in {
       Source
         .fromIterator(() => (1 to 1000).toIterator.map(k => if (k < 10) k else 
throw TE("a")))
@@ -516,6 +583,42 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         .failed
         .futureValue shouldBe a[TE]
     }
+
+    "complete when iterator factory throws and decider resumes" in {
+      Source
+        .fromIterator[Int](() => throw TE("factory"))
+        
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq.empty)
+    }
+
+    "restart iterator factory when iterator factory throws and decider 
restarts" in {
+      var attempts = 0
+      Source
+        .fromIterator { () =>
+          attempts += 1
+          if (attempts == 1) throw TE("factory")
+          else Iterator(1, 2, 3)
+        }
+        
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+      attempts should ===(2)
+    }
+
+    "fail when restarted iterator factory throws again" in {
+      var attempts = 0
+      Source
+        .fromIterator[Int] { () =>
+          attempts += 1
+          throw TE(s"factory-$attempts")
+        }
+        
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+        .runWith(Sink.seq)
+        .failed
+        .futureValue shouldBe a[TE]
+      attempts should ===(2)
+    }
   }
 
   "ZipN Source" must {
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
index 18ebc52537..1abea0b5ce 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
@@ -18,6 +18,7 @@ import pekko.actor.ActorRef
 import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.OverflowStrategies._
+import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.stage._
 import pekko.util.OptionVal
 
@@ -39,6 +40,7 @@ private object ActorRefSource {
   val out: Outlet[T] = Outlet[T]("actorRefSource.out")
 
   override val shape: SourceShape[T] = SourceShape.of(out)
+  override def initialAttributes: Attributes = DefaultAttributes.actorRefSource
 
   def createLogicAndMaterializedValue(inheritedAttributes: Attributes): 
(GraphStageLogic, ActorRef) =
     throw new IllegalStateException("Not supported")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
index b107857f86..78d68f690d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
@@ -17,12 +17,15 @@ import org.apache.pekko
 import pekko.annotation.InternalApi
 import pekko.stream.{ Attributes, Outlet, SourceShape }
 import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
 
 /**
  * INTERNAL API
  */
-@InternalApi private[pekko] final class FailedSource[T](val failure: 
Throwable) extends GraphStage[SourceShape[T]] {
+@InternalApi private[pekko] final class FailedSource[T](val failure: Throwable)
+    extends GraphStage[SourceShape[T]]
+    with ValuePresentedSource {
   val out = Outlet[T]("FailedSource.out")
   override val shape = SourceShape(out)
 
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
index 440a17e406..1fea035977 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
@@ -19,12 +19,14 @@ import java.util.function.Consumer
 import org.apache.pekko
 import pekko.annotation.InternalApi
 import pekko.stream._
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
 
 /** INTERNAL API */
 @InternalApi private[stream] final class JavaStreamSource[T, S <: 
java.util.stream.BaseStream[T, S]](
     val open: () => java.util.stream.BaseStream[T, S])
-    extends GraphStage[SourceShape[T]] {
+    extends GraphStage[SourceShape[T]]
+    with ValuePresentedSource {
 
   val out: Outlet[T] = Outlet("JavaStreamSource")
   override val shape: SourceShape[T] = SourceShape(out)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
index 61e217202a..36d8014ac3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
@@ -20,6 +20,7 @@ import pekko.Done
 import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.OverflowStrategies._
+import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.scaladsl.SourceQueueWithComplete
 import pekko.stream.stage._
 
@@ -49,6 +50,7 @@ import pekko.stream.stage._
 
   val out = Outlet[T]("queueSource.out")
   override val shape: SourceShape[T] = SourceShape.of(out)
+  override def initialAttributes: Attributes = DefaultAttributes.queueSource
 
   @scala.annotation.nowarn("msg=inferred structural type")
   override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes) = {
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
index c6206a106a..4ed9cbe643 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
@@ -22,9 +22,10 @@ import pekko.annotation.{ DoNotInherit, InternalApi }
 import pekko.stream._
 import pekko.stream.impl.StreamLayout.AtomicModule
 import pekko.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
-import pekko.stream.impl.fusing.{ GraphStageModule, IterableSource }
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
+import pekko.stream.impl.fusing.GraphStageModule
+import pekko.stream.impl.fusing.GraphStages.{ SingleSource, 
ValuePresentedSource }
 import pekko.stream.scaladsl.Keep
+import pekko.stream.stage.GraphStageWithMaterializedValue
 import pekko.util.OptionVal
 
 /**
@@ -354,24 +355,29 @@ import pekko.util.OptionVal
    * performance optimization in FlattenMerge and possibly other places.
    */
   def getSingleSource[A >: Null](graph: Graph[SourceShape[A], _]): 
OptionVal[SingleSource[A]] = {
+    @inline def fromModule(module: AtomicModule[_, _]): 
OptionVal[SingleSource[A]] =
+      module match {
+        case m: GraphStageModule[_, _] =>
+          m.stage match {
+            case single: SingleSource[A] @unchecked => OptionVal.Some(single)
+            case _                                  => OptionVal.None
+          }
+        case _ => OptionVal.None
+      }
+
     graph match {
       case single: SingleSource[A] @unchecked => OptionVal.Some(single)
       case _                                  =>
         graph.traversalBuilder match {
-          case l: LinearTraversalBuilder =>
+          case l: LinearTraversalBuilder if !l.attributes.isAsync =>
             l.pendingBuilder match {
               case OptionVal.Some(a: AtomicTraversalBuilder) =>
-                a.module match {
-                  case m: GraphStageModule[_, _] =>
-                    m.stage match {
-                      case single: SingleSource[A] @unchecked =>
-                        // It would be != EmptyTraversal if 
mapMaterializedValue was used and then we can't optimize.
-                        if ((l.traversalSoFar eq EmptyTraversal) && 
!l.attributes.isAsync)
-                          OptionVal.Some(single)
-                        else OptionVal.None
-                      case _ => OptionVal.None
-                    }
-                  case _ => OptionVal.None
+                // It would be != EmptyTraversal if mapMaterializedValue was 
used and then we can't optimize.
+                if (l.traversalSoFar eq EmptyTraversal) fromModule(a.module) 
else OptionVal.None
+              case OptionVal.None =>
+                l.traversalSoFar match {
+                  case MaterializeAtomic(module, _) => fromModule(module)
+                  case _                            => OptionVal.None
                 }
               case _ => OptionVal.None
             }
@@ -388,30 +394,34 @@ import pekko.util.OptionVal
   @InternalApi def getValuePresentedSource[A >: Null](
       graph: Graph[SourceShape[A], _]): OptionVal[Graph[SourceShape[A], _]] = {
     def isValuePresentedSource(graph: Graph[SourceShape[_ <: A], _]): Boolean 
= graph match {
-      case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _: 
JavaStreamSource[_, _] |
-          _: FailedSource[_] =>
-        true
+      case _: ValuePresentedSource                 => true
       case maybeEmpty if isEmptySource(maybeEmpty) => true
       case _                                       => false
     }
+    @inline def fromModule(module: AtomicModule[_, _]): 
OptionVal[Graph[SourceShape[A], _]] =
+      module match {
+        case m: GraphStageModule[_, _] =>
+          m.stage match {
+            case _ if 
isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) =>
+              OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]])
+            case _ => OptionVal.None
+          }
+        case _ => OptionVal.None
+      }
+
     graph match {
       case _ if isValuePresentedSource(graph) => OptionVal.Some(graph)
       case _                                  =>
         graph.traversalBuilder match {
-          case l: LinearTraversalBuilder =>
+          case l: LinearTraversalBuilder if !l.attributes.isAsync =>
             l.pendingBuilder match {
               case OptionVal.Some(a: AtomicTraversalBuilder) =>
-                a.module match {
-                  case m: GraphStageModule[_, _] =>
-                    m.stage match {
-                      case _ if 
isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) =>
-                        // It would be != EmptyTraversal if 
mapMaterializedValue was used and then we can't optimize.
-                        if ((l.traversalSoFar eq EmptyTraversal) && 
!l.attributes.isAsync)
-                          
OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]])
-                        else OptionVal.None
-                      case _ => OptionVal.None
-                    }
-                  case _ => OptionVal.None
+                // It would be != EmptyTraversal if mapMaterializedValue was 
used and then we can't optimize.
+                if (l.traversalSoFar eq EmptyTraversal) fromModule(a.module) 
else OptionVal.None
+              case OptionVal.None =>
+                l.traversalSoFar match {
+                  case MaterializeAtomic(module, _) => fromModule(module)
+                  case _                            => OptionVal.None
                 }
               case _ => OptionVal.None
             }
@@ -702,14 +712,17 @@ import pekko.util.OptionVal
    * than its generic counterpart. It can be freely mixed with the generic 
builder in both ways.
    */
   def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes): 
LinearTraversalBuilder = {
-    if (module.shape.inlets.size > 1)
+    val shape = module.shape
+    val inlets = shape.inlets
+    val outlets = shape.outlets
+    if (inlets.size > 1)
       throw new IllegalStateException("Modules with more than one input port 
cannot be linear.")
-    if (module.shape.outlets.size > 1)
+    if (outlets.size > 1)
       throw new IllegalStateException("Modules with more than one output port 
cannot be linear.")
-    TraversalBuilder.initShape(module.shape)
+    TraversalBuilder.initShape(shape)
 
-    val inPortOpt = OptionVal(module.shape.inlets.headOption.orNull)
-    val outPortOpt = OptionVal(module.shape.outlets.headOption.orNull)
+    val inPortOpt = OptionVal(if (inlets.isEmpty) null else inlets.head)
+    val outPortOpt = OptionVal(if (outlets.isEmpty) null else outlets.head)
 
     val wiring = if (outPortOpt.isDefined) wireBackward else noWire
 
@@ -779,6 +792,22 @@ import pekko.util.OptionVal
 
     }
   }
+
+  @inline @InternalApi private[pekko] def fromGraphStage(
+      graphStage: GraphStageWithMaterializedValue[_ <: Shape, _]): 
LinearTraversalBuilder = {
+    val builder = graphStage.traversalBuilder
+    val attributes = builder.attributes
+    val linear = builder match {
+      case atomic: AtomicTraversalBuilder =>
+        LinearTraversalBuilder.fromModule(atomic.module, Attributes.none)
+      case _ =>
+        val builderWithoutAttributes =
+          if (attributes eq Attributes.none) builder else 
builder.setAttributes(Attributes.none)
+        fromBuilder(builderWithoutAttributes, graphStage.shape, Keep.right)
+    }
+
+    if (attributes eq Attributes.none) linear else 
linear.setAttributes(attributes)
+  }
 }
 
 /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
index 5cf2bdc827..7592733d35 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
@@ -17,6 +17,7 @@
 
 package org.apache.pekko.stream.impl.fusing
 
+import scala.collection.immutable
 import scala.concurrent.Future
 import scala.util.{ Failure, Try }
 
@@ -25,7 +26,7 @@ import pekko.annotation.InternalApi
 import pekko.stream.{ Attributes, FlowShape, Graph, Inlet, Outlet, 
SourceShape, SubscriptionWithCancelException }
 import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource, 
JavaStreamSource, TraversalBuilder }
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
 import pekko.stream.scaladsl.Source
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
 import pekko.util.OptionVal
@@ -54,6 +55,35 @@ private[pekko] object FlattenConcat {
     override def isClosed: Boolean = !hasNext
   }
 
+  private final class InflightRangeSource[T](range: immutable.Range) extends 
InflightSource[T] {
+    private val isEmptyRange = range.isEmpty
+    private val rangeLast = if (isEmptyRange) 0 else range.last
+    private val rangeStep = range.step
+    private var nextElement = range.start
+    private var closed = isEmptyRange
+
+    override def hasNext: Boolean = !closed
+    override def next(): T =
+      if (closed) throw new NoSuchElementException("next called after 
completion")
+      else {
+        val current = nextElement
+        if (current == rangeLast) closed = true
+        else nextElement = current + rangeStep
+        current.asInstanceOf[T]
+      }
+    override def tryPull(): Unit = ()
+    override def cancel(cause: Throwable): Unit = ()
+    override def isClosed: Boolean = closed
+  }
+
+  private final class InflightRepeatSource[T](elem: T) extends 
InflightSource[T] {
+    override def hasNext: Boolean = true
+    override def next(): T = elem
+    override def tryPull(): Unit = ()
+    override def cancel(cause: Throwable): Unit = ()
+    override def isClosed: Boolean = false
+  }
+
   private final class InflightCompletedFutureSource[T](result: Try[T]) extends 
InflightSource[T] {
     private var _hasNext = result.isSuccess
     override def hasNext: Boolean = _hasNext
@@ -219,6 +249,26 @@ private[pekko] final class FlattenConcat[T, 
M](parallelism: Int)
         }
       }
 
+      private def addRangeSource(range: immutable.Range): Unit = {
+        val inflightSource = new InflightRangeSource[T](range)
+        if (isAvailable(out) && queue.isEmpty) {
+          if (inflightSource.hasNext) {
+            push(out, inflightSource.next())
+            if (inflightSource.hasNext)
+              queue.enqueue(inflightSource)
+          }
+        } else if (inflightSource.hasNext) {
+          queue.enqueue(inflightSource)
+        }
+      }
+
+      private def addRepeatSource(elem: T): Unit = {
+        val inflightSource = new InflightRepeatSource[T](elem)
+        if (isAvailable(out) && queue.isEmpty)
+          push(out, elem)
+        queue.enqueue(inflightSource)
+      }
+
       private def addCompletedFutureElem(elem: Try[T]): Unit = {
         if (isAvailable(out) && queue.isEmpty) {
           elem match {
@@ -287,6 +337,9 @@ private[pekko] final class FlattenConcat[T, M](parallelism: 
Int)
                   case None       => addPendingFutureElem(future)
                 }
               case iterable: IterableSource[T] @unchecked        => 
addSourceElements(iterable.elements.iterator)
+              case iterator: IteratorSource[T] @unchecked        => 
addSourceElements(iterator.createIterator())
+              case range: RangeSource[T] @unchecked              => 
addRangeSource(range.range)
+              case repeat: RepeatSource[T] @unchecked            => 
addRepeatSource(repeat.elem)
               case javaStream: JavaStreamSource[T, _] @unchecked =>
                 import scala.jdk.CollectionConverters._
                 addSourceElements(javaStream.open().iterator.asScala)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
index b95b29f980..b901584ffe 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
@@ -58,6 +58,11 @@ import pekko.stream.stage._
  */
 @InternalApi private[pekko] object GraphStages {
 
+  /**
+   * INTERNAL API
+   */
+  @InternalApi private[pekko] trait ValuePresentedSource
+
   /**
    * INTERNAL API
    */
@@ -277,7 +282,7 @@ import pekko.stream.stage._
     override def toString: String = s"TickSource($initialDelay, $interval, 
$tick)"
   }
 
-  final class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] {
+  final class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] 
with ValuePresentedSource {
     override def initialAttributes: Attributes = DefaultAttributes.singleSource
     ReactiveStreamsCompliance.requireNonNullElement(elem)
     val out = Outlet[T]("single.out")
@@ -294,6 +299,21 @@ import pekko.stream.stage._
     override def toString: String = "SingleSource"
   }
 
+  final class RepeatSource[T](val elem: T) extends GraphStage[SourceShape[T]] 
with ValuePresentedSource {
+    override def initialAttributes: Attributes = DefaultAttributes.repeat
+    ReactiveStreamsCompliance.requireNonNullElement(elem)
+    val out = Outlet[T]("repeat.out")
+    override val shape = SourceShape(out)
+    override def createLogic(attr: Attributes): GraphStageLogic =
+      new GraphStageLogic(shape) with OutHandler {
+        override def onPull(): Unit = push(out, elem)
+
+        setHandler(out, this)
+      }
+
+    override def toString: String = "RepeatSource"
+  }
+
   final class FutureFlattenSource[T, M](futureSource: 
Future[Graph[SourceShape[T], M]])
       extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
     ReactiveStreamsCompliance.requireNonNullElement(futureSource)
@@ -383,7 +403,7 @@ import pekko.stream.stage._
     override def toString: String = "FutureFlattenSource"
   }
 
-  final class FutureSource[T](val future: Future[T]) extends 
GraphStage[SourceShape[T]] {
+  final class FutureSource[T](val future: Future[T]) extends 
GraphStage[SourceShape[T]] with ValuePresentedSource {
     ReactiveStreamsCompliance.requireNonNullElement(future)
     val shape = SourceShape(Outlet[T]("FutureSource.out"))
     val out = shape.out
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
index cfcd1c035f..31edf9eac7 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
@@ -26,10 +26,13 @@ import pekko.stream.{ Attributes, Outlet, SourceShape, 
Supervision }
 import pekko.stream.ActorAttributes.SupervisionStrategy
 import pekko.stream.impl.ReactiveStreamsCompliance
 import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
 
 @InternalApi
-private[pekko] final class IterableSource[T](val elements: 
immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
+private[pekko] final class IterableSource[T](val elements: 
immutable.Iterable[T])
+    extends GraphStage[SourceShape[T]]
+    with ValuePresentedSource {
   ReactiveStreamsCompliance.requireNonNullElement(elements)
 
   override protected def initialAttributes: Attributes = 
DefaultAttributes.iterableSource
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
similarity index 56%
copy from 
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
copy to 
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
index cfcd1c035f..7c6baa5114 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
@@ -17,24 +17,25 @@
 
 package org.apache.pekko.stream.impl.fusing
 
-import scala.collection.immutable
 import scala.util.control.NonFatal
 
 import org.apache.pekko
 import pekko.annotation.InternalApi
-import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
 import pekko.stream.ActorAttributes.SupervisionStrategy
-import pekko.stream.impl.ReactiveStreamsCompliance
-import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
 
 @InternalApi
-private[pekko] final class IterableSource[T](val elements: 
immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
-  ReactiveStreamsCompliance.requireNonNullElement(elements)
+private[pekko] final class IteratorSource[T](
+    val createIterator: () => Iterator[T],
+    defaultAttributes: Attributes)
+    extends GraphStage[SourceShape[T]]
+    with ValuePresentedSource {
 
-  override protected def initialAttributes: Attributes = 
DefaultAttributes.iterableSource
+  override protected def initialAttributes: Attributes = defaultAttributes
 
-  private val out = Outlet[T]("IterableSource.out")
+  private val out = Outlet[T]("IteratorSource.out")
   override val shape: SourceShape[T] = SourceShape(out)
 
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
@@ -45,27 +46,44 @@ private[pekko] final class IterableSource[T](val elements: 
immutable.Iterable[T]
       override def onPull(): Unit =
         try {
           if (currentIterator eq null) {
-            currentIterator = elements.iterator
+            currentIterator = createIterator()
           }
-          tryPushNextOrComplete()
+          pushNextOrComplete()
         } catch {
           case NonFatal(ex) =>
-            decider(ex) match {
-              case Supervision.Stop    => failStage(ex)
-              case Supervision.Resume  => tryPushNextOrComplete()
-              case Supervision.Restart =>
-                currentIterator = elements.iterator
-                tryPushNextOrComplete()
+            if (currentIterator eq null) handleIteratorCreationFailure(ex)
+            else handleIteratorFailure(ex)
+        }
+
+      private def handleIteratorCreationFailure(ex: Throwable): Unit =
+        decider(ex) match {
+          case Supervision.Stop   => failStage(ex)
+          case Supervision.Resume =>
+            completeStage()
+          case Supervision.Restart =>
+            try {
+              currentIterator = createIterator()
+              pushNextOrComplete()
+            } catch {
+              case NonFatal(restartEx) => failStage(restartEx)
             }
         }
 
-      private def tryPushNextOrComplete(): Unit =
+      private def handleIteratorFailure(ex: Throwable): Unit =
+        decider(ex) match {
+          case Supervision.Stop    => failStage(ex)
+          case Supervision.Resume  => pushNextOrComplete()
+          case Supervision.Restart =>
+            currentIterator = createIterator()
+            pushNextOrComplete()
+        }
+
+      private def pushNextOrComplete(): Unit =
         if (currentIterator.hasNext) {
           if (isAvailable(out)) {
             push(out, currentIterator.next())
-            if (!currentIterator.hasNext) {
+            if (!currentIterator.hasNext)
               completeStage()
-            }
           }
         } else {
           completeStage()
@@ -74,5 +92,5 @@ private[pekko] final class IterableSource[T](val elements: 
immutable.Iterable[T]
       setHandler(out, this)
     }
 
-  override def toString: String = "IterableSource"
+  override def toString: String = "IteratorSource"
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 6f7b1db1ae..a0747c2b99 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -45,7 +45,7 @@ import pekko.stream.impl.{
   TraversalBuilder
 }
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, 
SimpleLinearGraphStage, SingleSource }
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SimpleLinearGraphStage, SingleSource }
 import pekko.stream.scaladsl.{
   DelayStrategy,
   Source,
@@ -2195,6 +2195,12 @@ private[pekko] object TakeWithin {
                       }
                     case iterableSource: IterableSource[T @unchecked] =>
                       emitMultiple(out, iterableSource.elements, () => 
completeStage())
+                    case iteratorSource: IteratorSource[T @unchecked] =>
+                      emitMultiple(out, iteratorSource.createIterator(), () => 
completeStage())
+                    case rangeSource: RangeSource[T @unchecked] =>
+                      emitMultiple(out, 
rangeSource.range.iterator.asInstanceOf[Iterator[T]], () => completeStage())
+                    case repeatSource: RepeatSource[T @unchecked] =>
+                      emitMultiple(out, 
Iterator.continually(repeatSource.elem), () => completeStage())
                     case javaStreamSource: JavaStreamSource[T @unchecked, _] =>
                       emitMultiple(out, javaStreamSource.open().spliterator(), 
() => completeStage())
                     case _ =>
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala
new file mode 100644
index 0000000000..4e9cad700e
--- /dev/null
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.collection.immutable
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.stream.impl.ReactiveStreamsCompliance
+import pekko.stream.impl.fusing.GraphStages.ValuePresentedSource
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
+
+@InternalApi
+private[pekko] final class RangeSource[T](val range: immutable.Range, 
defaultAttributes: Attributes)
+    extends GraphStage[SourceShape[T]]
+    with ValuePresentedSource {
+  ReactiveStreamsCompliance.requireNonNullElement(range)
+
+  override protected def initialAttributes: Attributes = defaultAttributes
+
+  private val out = Outlet[T]("RangeSource.out")
+  override val shape: SourceShape[T] = SourceShape(out)
+  private[this] val isEmptyRange = range.isEmpty
+  private[this] val rangeStart = range.start
+  private[this] val rangeLast = if (isEmptyRange) 0 else range.last
+  private[this] val rangeStep = range.step
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with OutHandler {
+      private[this] var nextElement = rangeStart
+
+      override def preStart(): Unit =
+        if (isEmptyRange) completeStage()
+
+      override def onPull(): Unit = {
+        val current = nextElement
+        val isLast = current == rangeLast
+        if (!isLast)
+          nextElement = current + rangeStep
+
+        push(out, current.asInstanceOf[T])
+        if (isLast)
+          completeStage()
+      }
+
+      setHandler(out, this)
+    }
+
+  override def toString: String = "RangeSource"
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 10a634f582..522be44914 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -38,7 +38,7 @@ import pekko.japi.function.Creator
 import pekko.stream._
 import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava 
}
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
+import pekko.stream.impl.fusing.{ RangeSource, StatefulMapConcat, 
ZipWithIndexJava }
 import pekko.util._
 
 import org.jspecify.annotations.Nullable
@@ -234,7 +234,9 @@ object Source {
    * @see [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]]
    */
   def range(start: Int, end: Int, step: Int): javadsl.Source[Integer, NotUsed] 
=
-    new Source(scaladsl.Source(Range.inclusive(start, end, 
step).asInstanceOf[immutable.Iterable[Integer]]))
+    new Source(
+      scaladsl.Source.fromGraph(
+        new RangeSource[Integer](Range.inclusive(start, end, step), 
DefaultAttributes.iterableSource)))
 
   /**
    * Elements are emitted periodically with the specified interval.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 4cb3070b21..1123b52a1b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -505,13 +505,7 @@ object Flow {
       case f: Flow[I, O, M]                                       => f
       case f: javadsl.Flow[I, O, M] @unchecked                    => f.asScala
       case g: GraphStageWithMaterializedValue[FlowShape[I, O], M] =>
-        // move these from the operator itself to make the returned source
-        // behave as it is the operator with regards to attributes
-        val attrs = g.traversalBuilder.attributes
-        val noAttrStage = g.withAttributes(Attributes.none)
-        new Flow(
-          LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, 
noAttrStage.shape, Keep.right),
-          noAttrStage.shape).withAttributes(attrs)
+        new Flow(LinearTraversalBuilder.fromGraphStage(g), g.shape)
 
       case _ => new 
Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, 
Keep.right), g.shape)
     }
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 1a49530dfe..8407ba5a0e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -150,13 +150,7 @@ object Sink {
       case s: Sink[T, M]                                       => s
       case s: javadsl.Sink[T, M] @unchecked                    => s.asScala
       case g: GraphStageWithMaterializedValue[SinkShape[T], M] =>
-        // move these from the stage itself to make the returned source
-        // behave as it is the stage with regards to attributes
-        val attrs = g.traversalBuilder.attributes
-        val noAttrStage = g.withAttributes(Attributes.none)
-        new Sink(
-          LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, 
noAttrStage.shape, Keep.right),
-          noAttrStage.shape).withAttributes(attrs)
+        new Sink(LinearTraversalBuilder.fromGraphStage(g), g.shape)
 
       case other =>
         new Sink(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, 
other.shape, Keep.right), other.shape)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 739382bf7e..6f60be9f56 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -30,7 +30,15 @@ import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.impl._
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ ArraySource, GraphStages, IterableSource, 
LazyFutureSource, LazySingleSource }
+import pekko.stream.impl.fusing.{
+  ArraySource,
+  GraphStages,
+  IterableSource,
+  IteratorSource,
+  LazyFutureSource,
+  LazySingleSource,
+  RangeSource
+}
 import pekko.stream.impl.fusing.GraphStages._
 import pekko.stream.stage.GraphStageWithMaterializedValue
 import pekko.util.ConstantFun
@@ -286,11 +294,8 @@ object Source {
    * Elements are pulled out of the iterator in accordance with the demand 
coming
    * from the downstream transformation steps.
    */
-  def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed] =
-    apply(new immutable.Iterable[T] {
-      override def iterator: Iterator[T] = f()
-      override def toString: String = "() => Iterator"
-    })
+  @inline def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed] =
+    fromGraphStage(new IteratorSource[T](f, DefaultAttributes.iterableSource))
 
   /**
    * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream 
iterator to get all its
@@ -309,11 +314,11 @@ object Source {
    * Starts a new 'cycled' `Source` from the given elements. The producer 
stream of elements
    * will continue infinitely by repeating the sequence of elements provided 
by function parameter.
    */
-  def cycle[T](f: () => Iterator[T]): Source[T, NotUsed] = {
+  @inline def cycle[T](f: () => Iterator[T]): Source[T, NotUsed] = {
     val iterator = Iterator.continually {
       val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty 
iterator") else i
     }.flatten
-    fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource)
+    fromGraphStage(new IteratorSource[T](() => iterator, 
DefaultAttributes.cycledSource))
   }
 
   /**
@@ -372,26 +377,25 @@ object Source {
   def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {
     case s: Source[T, M]                                       => s
     case s: javadsl.Source[T, M] @unchecked                    => s.asScala
-    case g: GraphStageWithMaterializedValue[SourceShape[T], M] =>
-      // move these from the stage itself to make the returned source
-      // behave as it is the stage with regards to attributes
-      val attrs = g.traversalBuilder.attributes
-      val noAttrStage = g.withAttributes(Attributes.none)
-      new Source(
-        LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, 
noAttrStage.shape, Keep.right),
-        noAttrStage.shape).withAttributes(attrs)
-    case other =>
+    case g: GraphStageWithMaterializedValue[SourceShape[T], M] => 
fromGraphStage(g)
+    case other                                                 =>
       // composite source shaped graph
       new Source(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, 
other.shape, Keep.right), other.shape)
   }
 
+  @inline private def fromGraphStage[T, M](g: 
GraphStageWithMaterializedValue[SourceShape[T], M]): Source[T, M] =
+    new Source(LinearTraversalBuilder.fromGraphStage(g), g.shape)
+
+  @inline private def fromRange[T](range: immutable.Range): Source[T, NotUsed] 
=
+    fromGraphStage(new RangeSource[T](range, DefaultAttributes.iterableSource))
+
   /**
    * Defers the creation of a [[Source]] until materialization. The `factory` 
function
    * exposes [[Materializer]] which is going to be used during materialization 
and
    * [[Attributes]] of the [[Source]] returned by this method.
    */
   def fromMaterializer[T, M](factory: (Materializer, Attributes) => Source[T, 
M]): Source[T, Future[M]] =
-    Source.fromGraph(new SetupSourceStage(factory))
+    fromGraphStage(new SetupSourceStage(factory))
 
   /**
    * Helper to create [[Source]] from `Iterable`.
@@ -403,13 +407,16 @@ object Source {
    * beginning) regardless of when they subscribed.
    * @see [[apply(immutable.Seq)]]
    */
-  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
+  @inline def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
     // unknown size is -1
     (iterable.knownSize: @switch) match {
       case 0 => empty
       case 1 => single(iterable.head)
       case _ =>
-        fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+        iterable match {
+          case range: immutable.Range => fromRange[T](range)
+          case _                      => fromGraphStage(new 
IterableSource[T](iterable))
+        }
     }
   }
 
@@ -424,12 +431,16 @@ object Source {
    * @see [[apply(immutable.Iterable)]]
    * @since 2.0.0
    */
-  def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = {
-    seq match {
-      case immutable.Seq()                   => empty[T]
-      case immutable.Seq(elem: T @unchecked) => single(elem)
-      case _                                 =>
-        fromGraph(new 
IterableSource[T](seq)).withAttributes(DefaultAttributes.iterableSource)
+  @inline def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = {
+    // unknown size is -1
+    (seq.knownSize: @switch) match {
+      case 0 => empty[T]
+      case 1 => single(seq.head)
+      case _ =>
+        seq match {
+          case range: immutable.Range => fromRange[T](range)
+          case _                      => fromGraphStage(new 
IterableSource[T](seq))
+        }
     }
   }
 
@@ -439,13 +450,13 @@ object Source {
    *
    * @since 1.3.0
    */
-  def apply[T](array: Array[T]): Source[T, NotUsed] = {
+  @inline def apply[T](array: Array[T]): Source[T, NotUsed] = {
     if (array.length == 0)
       empty
     else if (array.length == 1)
       single(array(0))
     else
-      Source.fromGraph(new ArraySource[T](array))
+      fromGraphStage(new ArraySource[T](array))
   }
 
   /**
@@ -456,14 +467,14 @@ object Source {
    * receive new tick elements as soon as it has requested more elements.
    */
   def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: 
T): Source[T, Cancellable] =
-    fromGraph(new TickSource[T](initialDelay, interval, tick))
+    fromGraphStage(new TickSource[T](initialDelay, interval, tick))
 
   /**
    * Create a `Source` with one element.
    * Every connected `Sink` of this stream will see an individual stream 
consisting of one element.
    */
-  def single[T](element: T): Source[T, NotUsed] =
-    fromGraph(new GraphStages.SingleSource(element))
+  @inline def single[T](element: T): Source[T, NotUsed] =
+    fromGraphStage(new GraphStages.SingleSource(element))
 
   /**
    * Create a `Source` from the given elements.
@@ -496,8 +507,8 @@ object Source {
   /**
    * Create a `Source` that will continually emit the given element.
    */
-  def repeat[T](element: T): Source[T, NotUsed] = {
-    fromIterator(() => 
Iterator.continually(element)).withAttributes(DefaultAttributes.repeat)
+  @inline def repeat[T](element: T): Source[T, NotUsed] = {
+    fromGraphStage(new GraphStages.RepeatSource(element))
   }
 
   /**
@@ -514,7 +525,7 @@ object Source {
    * }}}
    */
   def unfold[S, E](s: S)(f: S => Option[(S, E)]): Source[E, NotUsed] =
-    Source.fromGraph(new Unfold(s, f))
+    fromGraphStage(new Unfold(s, f))
 
   /**
    * Same as [[unfold]], but uses an async function to generate the next 
state-element tuple.
@@ -532,7 +543,7 @@ object Source {
    * }}}
    */
   def unfoldAsync[S, E](s: S)(f: S => Future[Option[(S, E)]]): Source[E, 
NotUsed] =
-    Source.fromGraph(new UnfoldAsync(s, f))
+    fromGraphStage(new UnfoldAsync(s, f))
 
   /**
    * Creates a sequential `Source` by iterating with the given predicate and 
function,
@@ -543,27 +554,29 @@ object Source {
    * @since 1.1.0
    */
   def iterate[T](seed: T)(p: T => Boolean, f: T => T): Source[T, NotUsed] =
-    fromIterator(() =>
-      new AbstractIterator[T] {
-        private var first = true
-        private var acc = seed
-        override def hasNext: Boolean = p(acc)
-        override def next(): T = {
-          if (first) {
-            first = false
-          } else {
-            acc = f(acc)
+    fromGraphStage(new IteratorSource[T](
+      () =>
+        new AbstractIterator[T] {
+          private var first = true
+          private var acc = seed
+          override def hasNext: Boolean = p(acc)
+          override def next(): T = {
+            if (first) {
+              first = false
+            } else {
+              acc = f(acc)
+            }
+            acc
           }
-          acc
-        }
-      }).withAttributes(DefaultAttributes.iterateSource)
+        },
+      DefaultAttributes.iterateSource))
 
   /**
    * A `Source` with no elements, i.e. an empty stream that is completed 
immediately for every connected `Sink`.
    */
   def empty[T]: Source[T, NotUsed] = _empty
   private[this] val _empty: Source[Nothing, NotUsed] =
-    Source.fromGraph(EmptySource)
+    fromGraphStage(EmptySource)
 
   /**
    * Create a `Source` which materializes a [[scala.concurrent.Promise]] which 
controls what element
@@ -577,20 +590,22 @@ object Source {
    * with None.
    */
   def maybe[T]: Source[T, Promise[Option[T]]] =
-    Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], 
Promise[Option[T]]]])
+    fromGraphStage(
+      MaybeSource.asInstanceOf[GraphStageWithMaterializedValue[SourceShape[T], 
Promise[Option[T]]]])
 
   /**
    * Create a `Source` that immediately ends the stream with the `cause` error 
to every connected `Sink`.
    */
   def failed[T](cause: Throwable): Source[T, NotUsed] =
-    Source.fromGraph(new FailedSource[T](cause))
+    fromGraphStage(new FailedSource[T](cause))
 
   /**
    * Emits a single value when the given `Future` is successfully completed 
and then completes the stream.
    * The stream fails if the `Future` is completed with a failure.
    */
+  @inline
   def future[T](futureElement: Future[T]): Source[T, NotUsed] = 
futureElement.value match {
-    case None                           => fromGraph(new 
FutureSource[T](futureElement))
+    case None                           => fromGraphStage(new 
FutureSource[T](futureElement))
     case Some(scala.util.Success(null)) => empty[T]
     case Some(scala.util.Success(elem)) => single(elem)
     case Some(scala.util.Failure(ex))   => failed[T](ex)
@@ -601,7 +616,7 @@ object Source {
    * This stream could be useful in tests.
    */
   def never[T]: Source[T, NotUsed] = _never
-  private[this] val _never: Source[Nothing, NotUsed] = 
fromGraph(GraphStages.NeverSource)
+  private[this] val _never: Source[Nothing, NotUsed] = 
fromGraphStage(GraphStages.NeverSource)
 
   /**
    * Emits a single value when the given `CompletionStage` is successfully 
completed and then completes the stream.
@@ -616,8 +631,9 @@ object Source {
    * Turn a `Future[Source]` into a source that will emit the values of the 
source when the future completes successfully.
    * If the `Future` is completed with a failure the stream is failed.
    */
+  @inline
   def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, 
Future[M]] = futureSource.value match {
-    case None                           => fromGraph(new 
FutureFlattenSource(futureSource))
+    case None                           => fromGraphStage(new 
FutureFlattenSource(futureSource))
     case Some(scala.util.Success(null)) =>
       val exception = new NullPointerException("futureSource completed with 
null")
       Source.failed(exception).mapMaterializedValue(_ => 
Future.failed[M](exception))
@@ -634,7 +650,7 @@ object Source {
    * the laziness and will trigger the factory immediately.
    */
   def lazySingle[T](create: () => T): Source[T, NotUsed] =
-    fromGraph(new LazySingleSource(create))
+    fromGraphStage(new LazySingleSource(create))
 
   /**
    * Defers invoking the `create` function to create a future element until 
there is downstream demand.
@@ -646,7 +662,7 @@ object Source {
    * the laziness and will trigger the factory immediately.
    */
   def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed] =
-    fromGraph(new LazyFutureSource(create))
+    fromGraphStage(new LazyFutureSource(create))
 
   /**
    * Defers invoking the `create` function to create a future source until 
there is downstream demand.
@@ -665,7 +681,7 @@ object Source {
    * is failed with a [[pekko.stream.NeverMaterializedException]]
    */
   def lazySource[T, M](create: () => Source[T, M]): Source[T, Future[M]] =
-    fromGraph(new LazySource(create))
+    fromGraphStage(new LazySource(create))
 
   /**
    * Defers invoking the `create` function to create a future source until 
there is downstream demand.
@@ -738,9 +754,7 @@ object Source {
       overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
     require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
     require(!overflowStrategy.isBackpressure, "Backpressure overflowStrategy 
not supported")
-    Source
-      .fromGraph(new ActorRefSource(bufferSize, overflowStrategy, 
completionMatcher, failureMatcher))
-      .withAttributes(DefaultAttributes.actorRefSource)
+    fromGraphStage(new ActorRefSource(bufferSize, overflowStrategy, 
completionMatcher, failureMatcher))
   }
 
   /**
@@ -751,7 +765,7 @@ object Source {
       ackMessage: Any,
       completionMatcher: PartialFunction[Any, CompletionStrategy],
       failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = {
-    Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage, 
completionMatcher, failureMatcher))
+    fromGraphStage(new ActorRefBackpressureSource(ackTo, ackMessage, 
completionMatcher, failureMatcher))
   }
 
   /**
@@ -772,7 +786,7 @@ object Source {
       ackMessage: Any,
       completionMatcher: PartialFunction[Any, CompletionStrategy],
       failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = {
-    Source.fromGraph(new ActorRefBackpressureSource(None, ackMessage, 
completionMatcher, failureMatcher))
+    fromGraphStage(new ActorRefBackpressureSource(None, ackMessage, 
completionMatcher, failureMatcher))
   }
 
   /**
@@ -910,7 +924,7 @@ object Source {
    * @param bufferSize size of the buffer in number of elements
    */
   def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] =
-    Source.fromGraph(new BoundedSourceQueueStage[T](bufferSize))
+    fromGraphStage(new BoundedSourceQueueStage[T](bufferSize))
 
   /**
    * Creates a Source that will immediately execute the provided function 
`producer` with a [[BoundedSourceQueue]] when materialized.
@@ -1029,8 +1043,7 @@ object Source {
       bufferSize: Int,
       overflowStrategy: OverflowStrategy,
       maxConcurrentOffers: Int): Source[T, SourceQueueWithComplete[T]] =
-    Source.fromGraph(
-      new QueueSource(bufferSize, overflowStrategy, 
maxConcurrentOffers).withAttributes(DefaultAttributes.queueSource))
+    fromGraphStage(new QueueSource(bufferSize, overflowStrategy, 
maxConcurrentOffers))
 
   /**
    * Start a new `Source` from some resource which can be opened, read and 
closed.
@@ -1063,7 +1076,7 @@ object Source {
    * @tparam R - the resource type.
    */
   def unfoldResource[T, R](create: () => R, read: (R) => Option[T], close: (R) 
=> Unit): Source[T, NotUsed] =
-    Source.fromGraph(new UnfoldResourceSource(create, read, close))
+    fromGraphStage(new UnfoldResourceSource(create, read, close))
 
   /**
    * Start a new `Source` from some resource which can be opened, read and 
closed.
@@ -1091,7 +1104,7 @@ object Source {
       create: () => Future[R],
       read: (R) => Future[Option[T]],
       close: (R) => Future[Done]): Source[T, NotUsed] =
-    Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))
+    fromGraphStage(new UnfoldResourceSourceAsync(create, read, close))
 
   /**
    * Merge multiple [[Source]]s. Prefer the sources depending on the 
'priority' parameters.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to