This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch hepin-gather-statefulmap-coverage-v2
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to 
refs/heads/hepin-gather-statefulmap-coverage-v2 by this push:
     new 49f2270bb0 stream: apply scalafmt formatting to gather operator files
49f2270bb0 is described below

commit 49f2270bb06e1156b1ac49e05736127b13eeafbd
Author: He-Pin <[email protected]>
AuthorDate: Tue Apr 7 03:59:05 2026 +0800

    stream: apply scalafmt formatting to gather operator files
    
    🤖 Generated with [Qoder](https://qoder.com)
---
 .../scala/docs/stream/operators/flow/Gather.scala  |  2 +-
 .../pekko/stream/scaladsl/FlowGatherSpec.scala     | 55 ++++++++++++----------
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  |  8 ++--
 3 files changed, 36 insertions(+), 29 deletions(-)

diff --git a/docs/src/test/scala/docs/stream/operators/flow/Gather.scala 
b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
index 4b5fbb7a99..f8f7beadc2 100644
--- a/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
+++ b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
@@ -84,7 +84,7 @@ object Gather {
           override def apply(elem: String, collector: 
GatherCollector[String]): Unit =
             lastElement match {
               case Some(last) if last == elem =>
-              case _ =>
+              case _                          =>
                 lastElement = Some(elem)
                 collector.push(elem)
             }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
index 6e2b1b8dcc..46c7f060e9 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
@@ -26,11 +26,17 @@ import scala.util.Success
 import scala.util.control.NoStackTrace
 
 import org.apache.pekko.Done
-import org.apache.pekko.stream.{ AbruptStageTerminationException, 
ActorAttributes, ActorMaterializer, ClosedShape, Supervision }
+import org.apache.pekko.stream.{
+  AbruptStageTerminationException,
+  ActorAttributes,
+  ActorMaterializer,
+  ClosedShape,
+  Supervision
+}
 import org.apache.pekko.stream.testkit.{ StreamSpec, TestSubscriber }
 import org.apache.pekko.stream.testkit.Utils.TE
 import org.apache.pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
-import org.apache.pekko.stream.scaladsl.{ Keep, Flow }
+import org.apache.pekko.stream.scaladsl.{ Flow, Keep }
 import org.apache.pekko.testkit.EventFilter
 
 class FlowGatherSpec extends StreamSpec {
@@ -286,7 +292,7 @@ class FlowGatherSpec extends StreamSpec {
             override def apply(elem: String, collector: 
GatherCollector[String]): Unit =
               lastElement match {
                 case Some(last) if last == elem =>
-                case _ =>
+                case _                          =>
                   lastElement = Some(elem)
                   collector.push(elem)
               }
@@ -329,16 +335,16 @@ class FlowGatherSpec extends StreamSpec {
       val generation = new AtomicInteger(0)
       val (source, sink) = TestSource[String]()
         .viaMat(Flow[String].gather(() => {
-            val currentGeneration = generation.incrementAndGet()
-            new Gatherer[String, String] {
-              override def apply(elem: String, collector: 
GatherCollector[String]): Unit =
-                if (elem == "boom") throw TE("boom")
-                else collector.push(s"$elem$currentGeneration")
-
-              override def onComplete(collector: GatherCollector[String]): 
Unit =
-                collector.push(s"onClose$currentGeneration")
-            }
-          }))(Keep.left)
+          val currentGeneration = generation.incrementAndGet()
+          new Gatherer[String, String] {
+            override def apply(elem: String, collector: 
GatherCollector[String]): Unit =
+              if (elem == "boom") throw TE("boom")
+              else collector.push(s"$elem$currentGeneration")
+
+            override def onComplete(collector: GatherCollector[String]): Unit =
+              collector.push(s"onClose$currentGeneration")
+          }
+        }))(Keep.left)
         
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
         .toMat(TestSink())(Keep.both)
         .run()
@@ -688,15 +694,15 @@ class FlowGatherSpec extends StreamSpec {
       val closedCounter = new AtomicInteger(0)
       val (source, sink) = TestSource[Int]()
         .viaMat(Flow[Int].gather(() =>
-            new Gatherer[Int, Int] {
-              override def apply(elem: Int, collector: GatherCollector[Int]): 
Unit =
-                collector.push(elem)
+          new Gatherer[Int, Int] {
+            override def apply(elem: Int, collector: GatherCollector[Int]): 
Unit =
+              collector.push(elem)
 
-              override def onComplete(collector: GatherCollector[Int]): Unit = 
{
-                closedCounter.incrementAndGet()
-                throw TE("boom")
-              }
-            }))(Keep.left)
+            override def onComplete(collector: GatherCollector[Int]): Unit = {
+              closedCounter.incrementAndGet()
+              throw TE("boom")
+            }
+          }))(Keep.left)
         .toMat(TestSink[Int]())(Keep.both)
         .run()
 
@@ -822,12 +828,13 @@ class FlowGatherSpec extends StreamSpec {
       import GraphDSL.Implicits._
       val unzip = b.add(Unzip[Int, Int]())
       val zip = b.add(Zip[Int, Int]())
-      val gather = b.add(Flow[(Int, Int)].gather(() => (elem: (Int, Int), 
collector: GatherCollector[(Int, Int)]) => collector.push(elem)))
+      val gather = b.add(Flow[(Int, Int)].gather(() =>
+        (elem: (Int, Int), collector: GatherCollector[(Int, Int)]) => 
collector.push(elem)))
 
-      source ~> unzip.in
+      source     ~> unzip.in
       unzip.out0 ~> zip.in0
       unzip.out1 ~> zip.in1
-      zip.out ~> gather ~> sink.in
+      zip.out    ~> gather ~> sink.in
 
       ClosedShape
     })
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 69beb30715..e7955dcad2 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -2477,7 +2477,7 @@ private[pekko] final class Gather[In, Out](factory: () => 
Gatherer[In, Out]) ext
 
       private def onPushOneToOne(): Unit = {
         val elem = oneToOneGatherer match {
-          case s: OneToOneGatherer[In, Out] @unchecked   => 
s.applyOne(grab(in))
+          case s: OneToOneGatherer[In, Out] @unchecked            => 
s.applyOne(grab(in))
           case j: JGatherers.OneToOneGatherer[In, Out] @unchecked => 
j.applyOne(grab(in))
         }
         ReactiveStreamsCompliance.requireNonNullElement(elem)
@@ -2561,9 +2561,9 @@ private[pekko] final class Gather[In, Out](factory: () => 
Gatherer[In, Out]) ext
           throw new IllegalStateException("Gatherer factory must not return 
null")
         gatherer = newGatherer
         oneToOneGatherer = gatherer match {
-          case _: OneToOneGatherer[?, ?]                            => gatherer
-          case _: JGatherers.OneToOneGatherer[?, ?]                 => gatherer
-          case _                                                    => null
+          case _: OneToOneGatherer[?, ?]            => gatherer
+          case _: JGatherers.OneToOneGatherer[?, ?] => gatherer
+          case _                                    => null
         }
         multiMode = false
         pendingOverflow = null


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to