This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new ec7fdc7d0f feat: Add groupedAdjacentBy and GroupedAdjacentByWeighted 
operators. (#1937)
ec7fdc7d0f is described below

commit ec7fdc7d0ffda7f8cdcaeb5b7517a62f4979bab5
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu Jul 10 10:54:46 2025 +0800

    feat: Add groupedAdjacentBy and GroupedAdjacentByWeighted operators. (#1937)
---
 .../operators/Source-or-Flow/groupedAdjacentBy.md  |  43 ++++++
 .../Source-or-Flow/groupedAdjacentByWeighted.md    |  42 ++++++
 docs/src/main/paradox/stream/operators/index.md    |   4 +
 .../java/jdocs/stream/operators/SourceOrFlow.java  |  26 ++++
 .../operators/sourceorflow/GroupedAdjacentBy.scala |  53 +++++++
 .../org/apache/pekko/stream/javadsl/FlowTest.java  |  27 ++++
 .../FlowGroupedAdjacentByWeightedSpec.scala        | 100 ++++++++++++
 .../org/apache/pekko/stream/impl/Stages.scala      |   1 +
 .../impl/fusing/GroupedAdjacentByWeighted.scala    | 168 +++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Flow.scala     |  44 ++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   |  43 ++++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  |  43 ++++++
 .../apache/pekko/stream/javadsl/SubSource.scala    |  43 ++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |  40 +++++
 14 files changed, 677 insertions(+)

diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md
new file mode 100644
index 0000000000..0d970777c2
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md
@@ -0,0 +1,43 @@
+# groupedAdjacentBy
+
+Partitions this stream into chunks by a delimiter function.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.groupedAdjacentBy](Source) { 
scala="#groupedAdjacentBy(f:Out=&gt;T):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]"
 java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function)" }
+@apidoc[Flow.groupedAdjacentBy](Flow) { 
scala="#groupedAdjacentBy(f:Out=&gt;T):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]"
 java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function)" }
+
+
+## Description
+
+Partitions this stream into chunks by a delimiter function.
+
+See also:
+
+* @ref[groupedAdjacentByWeighted](groupedAdjacentByWeighted.md) for a variant 
that groups with weight limit too.
+
+## Examples
+
+The example below demonstrates how `groupedAdjacentBy` partitions the elements 
into @scala[`Seq`] @java[`List`].
+
+Scala
+:  @@snip 
[GroupedAdjacentBy.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala)
 { #groupedAdjacentBy }
+
+Java
+:  @@snip 
[SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java)
 { #groupedAdjacentBy }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the delimiter function returns a different value than the 
previous element's result
+
+**backpressures** when a chunk has been assembled and downstream backpressures
+
+**completes** when upstream completes
+
+@@@
+
+
diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md
 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md
new file mode 100644
index 0000000000..4aaa68020c
--- /dev/null
+++ 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md
@@ -0,0 +1,42 @@
+# groupedAdjacentByWeighted
+
+Partitions this stream into chunks by a delimiter function and a weight limit.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.groupedAdjacentByWeighted](Source) { 
scala="#groupedAdjacentByWeighted(f:Out=&gt;T,maxWeight:Long)(costFn:Out=&gt;Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]"
 
java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function,long,org.apache.pekko.japi.function.Function)"
 }
+@apidoc[Flow.groupedAdjacentByWeighted](Flow) { 
scala="#groupedAdjacentByWeighted(f:Out=&gt;T,maxWeight:Long)(costFn:Out=&gt;Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]"
 
java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function,long,org.apache.pekko.japi.function.Function)"
 }
+
+## Description
+
+Partitions this stream into chunks by a delimiter function.
+
+See also:
+
+* @ref[groupedAdjacentBy](groupedAdjacentBy.md) for a simpler variant.
+
+## Examples
+
+The example below demonstrates how `groupedAdjacentByWeighted` partitions the 
elements into @scala[`Seq`] @java[`List`].
+
+Scala
+:  @@snip 
[GroupedAdjacentBy.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala)
 { #groupedAdjacentByWeighted }
+
+Java
+:  @@snip 
[SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java)
 { #groupedAdjacentByWeighted }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the delimiter function returns a different value than the 
previous element's result,  or exceeds the `maxWeight`.
+
+**backpressures** when a chunk has been assembled and downstream backpressures
+
+**completes** when upstream completes
+
+@@@
+
+
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 37e2d6fdcc..8d0dfabcba 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -169,6 +169,8 @@ depending on being backpressured by downstream or not.
 |Source/Flow|<a 
name="frommaterializer"></a>@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer
 the creation of a `Source/Flow` until materialization and access 
`Materializer` and `Attributes`|
 |Flow|<a name="futureflow"></a>@ref[futureFlow](Flow/futureFlow.md)|Streams 
the elements through the given future flow once it successfully completes.|
 |Source/Flow|<a 
name="grouped"></a>@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming 
events until the specified number of elements have been accumulated and then 
pass the collection of elements downstream.|
+|Source/Flow|<a 
name="groupedadjacentby"></a>@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions
 this stream into chunks by a delimiter function.|
+|Source/Flow|<a 
name="groupedadjacentbyweighted"></a>@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions
 this stream into chunks by a delimiter function and a weight limit.|
 |Source/Flow|<a 
name="groupedweighted"></a>@ref[groupedWeighted](Source-or-Flow/groupedWeighted.md)|Accumulate
 incoming events until the combined weight of elements is greater than or equal 
to the minimum weight and then pass the collection of elements downstream.|
 |Source/Flow|<a 
name="intersperse"></a>@ref[intersperse](Source-or-Flow/intersperse.md)|Intersperse
 stream with provided element similar to `List.mkString`.|
 |Flow|<a 
name="lazycompletionstageflow"></a>@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers
 creation and materialization of a `Flow` until there is a first element.|
@@ -499,6 +501,8 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [futureSource](Source/futureSource.md)
 * [groupBy](Source-or-Flow/groupBy.md)
 * [grouped](Source-or-Flow/grouped.md)
+* [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)
+* [groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)
 * [groupedWeighted](Source-or-Flow/groupedWeighted.md)
 * [groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md)
 * [groupedWithin](Source-or-Flow/groupedWithin.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java 
b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
index 4a9d058d42..65c985d849 100644
--- a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
+++ b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
@@ -467,6 +467,32 @@ class SourceOrFlow {
     // #groupedWeighted
   }
 
+  void groupedAdjacentByExample() {
+    // #groupedAdjacentBy
+    Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey"))
+        .groupedAdjacentBy(str -> str.charAt(0))
+        .runForeach(System.out::println, system);
+    // prints:
+    // [Hello, Hi]
+    // [Greetings]
+    // [Hey]
+    // #groupedAdjacentBy
+  }
+
+  void groupedAdjacentByWeightedExample() {
+    // #groupedAdjacentByWeighted
+    Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
+        .groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) 
str.length())
+        .runForeach(System.out::println, system);
+    // prints:
+    // [Hello]
+    // [HiHi]
+    // [Hi, Hi]
+    // [Greetings]
+    // [Hey]
+    // #groupedAdjacentByWeighted
+  }
+
   static
   // #fold // #foldAsync
   class Histogram {
diff --git 
a/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala
 
b/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala
new file mode 100644
index 0000000000..a8f1f51f51
--- /dev/null
+++ 
b/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2019-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.stream.operators.sourceorflow
+
+import org.apache.pekko.stream.scaladsl.Source
+
+import scala.collection.immutable
+
+object GroupedAdjacentBy {
+  def groupedAdjacentByExample(): Unit = {
+    import org.apache.pekko.actor.ActorSystem
+    implicit val system: ActorSystem = ActorSystem()
+
+    // #groupedAdjacentBy
+    Source(List("Hello", "Hi", "Greetings", "Hey"))
+      .groupedAdjacentBy(_.head)
+      .runForeach(println)
+    // prints:
+    // Vector(Hello, Hi)
+    // Vector(Greetings)
+    // Vector(Hey)
+    // #groupedAdjacentBy
+  }
+
+  def groupedAdjacentByWeightedExample(): Unit = {
+    import org.apache.pekko.actor.ActorSystem
+    implicit val system: ActorSystem = ActorSystem()
+
+    // #groupedAdjacentByWeighted
+    Source(List("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
+      .groupedAdjacentByWeighted(_.head, 4)(_.length)
+      .runForeach(println)
+    // prints:
+    // Vector(Hello)
+    // Vector(HiHi)
+    // Vector(Hi, Hi)
+    // Vector(Greetings)
+    // Vector(Hey)
+    // #groupedAdjacentByWeighted
+  }
+
+}
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index 1b6b400f85..0a45acc2bd 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -13,6 +13,7 @@
 
 package org.apache.pekko.stream.javadsl;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.pekko.Done;
 import org.apache.pekko.NotUsed;
@@ -122,6 +123,32 @@ public class FlowTest extends StreamTest {
     probe.expectMsgEquals("de");
   }
 
+  @Test
+  public void mustBeAbleToUseGroupedAdjacentBy() {
+    Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey"))
+        .groupedAdjacentBy(str -> str.charAt(0))
+        .runWith(TestSink.probe(system), system)
+        .request(4)
+        .expectNext(Lists.newArrayList("Hello", "Hi"))
+        .expectNext(Lists.newArrayList("Greetings"))
+        .expectNext(Lists.newArrayList("Hey"))
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToUseGroupedAdjacentByWeighted() {
+    Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
+        .groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) 
str.length())
+        .runWith(TestSink.probe(system), system)
+        .request(6)
+        .expectNext(Lists.newArrayList("Hello"))
+        .expectNext(Lists.newArrayList("HiHi"))
+        .expectNext(Lists.newArrayList("Hi", "Hi"))
+        .expectNext(Lists.newArrayList("Greetings"))
+        .expectNext(Lists.newArrayList("Hey"))
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToUseContraMap() {
     final Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2", 
"3"));
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala
new file mode 100644
index 0000000000..b6b53fdf18
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
+
+class FlowGroupedAdjacentByWeightedSpec extends StreamSpec("""
+    pekko.stream.materializer.initial-input-buffer-size = 2
+  """) with ScriptedTest {
+
+  "A GroupedAdjacentByWeighted" must {
+    "produce no group when source is empty" in {
+      Source.empty[String]
+        .groupedAdjacentBy(identity(_))
+        .runWith(TestSink.probe[Seq[String]])
+        .request(1)
+        .expectComplete()
+    }
+
+    "group adjacent elements by predicate" in {
+      val input = List("a", "a", "b", "b", "c", "c")
+      Source(input)
+        .groupedAdjacentBy(identity(_))
+        .runWith(TestSink.probe[Seq[String]])
+        .request(6)
+        .expectNext(Seq("a", "a"))
+        .expectNext(Seq("b", "b"))
+        .expectNext(Seq("c", "c"))
+        .expectComplete()
+    }
+
+    "group adjust elements by leading char" in {
+      val input = List("Hello", "Hi", "Greetings", "Hey")
+      Source(input)
+        .groupedAdjacentBy(_.head)
+        .runWith(TestSink.probe[Seq[String]])
+        .request(4)
+        .expectNext(Seq("Hello", "Hi"))
+        .expectNext(Seq("Greetings"))
+        .expectNext(Seq("Hey"))
+        .expectComplete()
+    }
+
+    "be able to act like bufferUntilChanged" in {
+      Source(List(1, 1, 2, 2, 3, 3, 1))
+        .groupedAdjacentBy(identity(_))
+        .runWith(TestSink.probe[Seq[Int]])
+        .request(7)
+        .expectNext(Seq(1, 1))
+        .expectNext(Seq(2, 2))
+        .expectNext(Seq(3, 3))
+        .expectNext(Seq(1))
+        .expectComplete()
+    }
+
+    "Be able to limit the chunk size" in {
+      Source(List("Hello", "Hi", "Hey", "Greetings", "Hey"))
+        .groupedAdjacentByWeighted(_.head, 2)(_ => 1L)
+        .runWith(TestSink.probe[Seq[String]])
+        .request(5)
+        .expectNext(Seq("Hello", "Hi"))
+        .expectNext(Seq("Hey"))
+        .expectNext(Seq("Greetings"))
+        .expectNext(Seq("Hey"))
+        .expectComplete()
+    }
+
+    "Be able to handle single heavy weighted element" in {
+      Source(List("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
+        .groupedAdjacentByWeighted(_.head, 4)(_.length)
+        .runWith(TestSink.probe[Seq[String]])
+        .request(6)
+        .expectNext(Seq("Hello"))
+        .expectNext(Seq("HiHi"))
+        .expectNext(Seq("Hi", "Hi"))
+        .expectNext(Seq("Greetings"))
+        .expectNext(Seq("Hey"))
+        .expectComplete()
+    }
+
+  }
+
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 00278f7a41..cd01348ce3 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -47,6 +47,7 @@ import pekko.stream.Attributes._
     val mapWithResource = name("mapWithResource") and IODispatcher
     val ask = name("ask")
     val grouped = name("grouped")
+    val groupedAdjacentByWeighted = name("groupedAdjacentByWeighted")
     val groupedWithin = name("groupedWithin")
     val groupedWeighted = name("groupedWeighted")
     val groupedWeightedWithin = name("groupedWeightedWithin")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala
new file mode 100644
index 0000000000..adb395b141
--- /dev/null
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+import pekko.util.OptionVal
+
+import scala.collection.immutable
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final case class GroupedAdjacentByWeighted[T, R](
+    f: T => R,
+    maxWeight: Long,
+    costFn: T => Long)
+    extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
+
+  require(f != null, "f must not be null")
+  require(maxWeight > 0, "maxWeight must be greater than 0")
+  require(costFn != null, "costFn must not be null")
+
+  private val in = Inlet[T]("GroupedAdjacentByWeighted.in")
+  private val out = Outlet[immutable.Seq[T]]("GroupedAdjacentByWeighted.out")
+
+  override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
+  override def initialAttributes: Attributes = 
DefaultAttributes.groupedAdjacentByWeighted
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      private var builder = Vector.newBuilder[T]
+      private var currentWeight: Long = 0L
+      // used to track if elements has been added to the current group, zero 
weight is allowed
+      private var hasElements: Boolean = false
+      private var currentKey: OptionVal[R] = OptionVal.none
+      private var pendingGroup: OptionVal[immutable.Seq[T]] = OptionVal.none
+
+      override def onPush(): Unit = {
+        val elem = grab(in)
+        val cost = costFn(elem)
+
+        if (cost < 0L) {
+          failStage(new IllegalArgumentException(s"Negative weight [$cost] for 
element [$elem] is not allowed"))
+          return
+        }
+
+        val elemKey = f(elem)
+        require(elemKey != null, "Element key must not be null")
+
+        if (shouldPushDirectly(cost)) {
+          push(out, Vector(elem))
+        } else if (shouldStartNewGroup(elemKey, cost)) {
+          emitCurrentGroup()
+          handleNewElement(elem, cost, elemKey)
+        } else {
+          addToCurrentGroup(elem, cost, elemKey)
+          tryPullIfNeeded()
+        }
+      }
+
+      private def shouldPushDirectly(cost: Long): Boolean = {
+        cost >= maxWeight && !hasElements
+      }
+
+      private def shouldStartNewGroup(elemKey: R, cost: Long): Boolean = 
currentKey match {
+        case OptionVal.Some(key) if (elemKey != key) || (currentWeight + cost 
> maxWeight) => true
+        case OptionVal.None if cost > maxWeight                                
            => true
+        case _                                                                 
            => false
+      }
+
+      private def emitCurrentGroup(): Unit = if (hasElements) {
+        val group = builder.result()
+        resetGroup()
+        pushOrQueue(group)
+      }
+
+      private def handleNewElement(elem: T, cost: Long, key: R): Unit = {
+        if (cost > maxWeight) {
+          pushOrQueue(Vector(elem))
+        } else {
+          addToCurrentGroup(elem, cost, key)
+        }
+        tryPullIfNeeded()
+      }
+
+      private def addToCurrentGroup(elem: T, cost: Long, key: R): Unit = {
+        builder += elem
+        hasElements = true
+        currentWeight += cost
+        currentKey = OptionVal.Some(key)
+      }
+
+      private def resetGroup(): Unit = {
+        builder.clear()
+        hasElements = false
+        currentWeight = 0L
+        currentKey = OptionVal.none
+      }
+
+      private def pushOrQueue(group: immutable.Seq[T]): Unit = pendingGroup 
match {
+        case OptionVal.Some(pending) =>
+          push(out, pending)
+          pendingGroup = OptionVal.Some(group)
+        case OptionVal.None =>
+          if (isAvailable(out)) {
+            push(out, group)
+          } else {
+            pendingGroup = OptionVal.Some(group)
+          }
+      }
+
+      private def tryPullIfNeeded(): Unit = pendingGroup match {
+        case OptionVal.None if !hasBeenPulled(in) && isAvailable(out) => 
pull(in)
+        case _                                                        =>
+      }
+
+      override def onPull(): Unit = {
+        pendingGroup match {
+          case OptionVal.Some(group) =>
+            push(out, group)
+            pendingGroup = OptionVal.none
+          case _ => if (!hasBeenPulled(in)) pull(in)
+        }
+      }
+
+      override def onUpstreamFinish(): Unit = {
+        val finalGroup = builder.result()
+        builder = null
+
+        pendingGroup match {
+          case OptionVal.Some(group) =>
+            if (finalGroup.nonEmpty) {
+              emitMultiple(out, List(group, finalGroup).iterator, () => 
completeStage())
+            } else {
+              emit(out, group, () => completeStage())
+            }
+          case OptionVal.None =>
+            if (finalGroup.nonEmpty) {
+              emit(out, finalGroup, () => completeStage())
+            } else {
+              completeStage()
+            }
+        }
+      }
+
+      setHandlers(in, out, this)
+    }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index fb0696ad55..53697ed7e3 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1320,6 +1320,50 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
       costFn: java.util.function.Function[Out, java.lang.Long]): 
javadsl.Flow[In, java.util.List[Out], Mat] =
     new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) 
// TODO optimize to one step
 
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value for all elements, otherwise 
the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentBy[R](
+      f: function.Function[Out, R]): javadsl.Flow[In, java.util.List[Out 
@uncheckedVariance], Mat] =
+    new Flow(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
+
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, or the accumulated weight exceeds
+   * the `maxWeight`, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value , and the `costFn` must 
return a non-negative result for all inputs,
+   * otherwise the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result, or exceeds the `maxWeight`.
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
+      maxWeight: Long,
+      costFn: java.util.function.Function[Out, java.lang.Long])
+      : javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
+    new Flow(delegate.groupedAdjacentByWeighted(f.apply, 
maxWeight)(costFn.apply).map(_.asJava))
+
   /**
    * Ensure stream boundedness by limiting the number of elements from 
upstream.
    * If the number of incoming elements exceeds max, it will signal
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index fe3e6c694b..8ce6c92889 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3103,6 +3103,49 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
       : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
     new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
 
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value for all elements, otherwise 
the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentBy[R](f: function.Function[Out, R]): 
javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
+    new Source(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
+
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, or the accumulated weight exceeds
+   * the `maxWeight`, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value , and the `costFn` must 
return a non-negative result for all inputs,
+   * otherwise the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result, or exceeds the `maxWeight`.
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
+      maxWeight: Long,
+      costFn: java.util.function.Function[Out, java.lang.Long])
+      : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
+    new Source(delegate.groupedAdjacentByWeighted(f.apply, 
maxWeight)(costFn.apply).map(_.asJava))
+
   /**
    * Ensure stream boundedness by limiting the number of elements from 
upstream.
    * If the number of incoming elements exceeds max, it will signal
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 5358de307a..edb3b907b8 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -679,6 +679,49 @@ class SubFlow[In, Out, Mat](
       costFn: function.Function[Out, java.lang.Long]): SubFlow[In, 
java.util.List[Out @uncheckedVariance], Mat] =
     new 
SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // 
TODO optimize to one step
 
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value for all elements, otherwise 
the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentBy[R](f: function.Function[Out, R]): SubFlow[In, 
java.util.List[Out @uncheckedVariance], Mat] =
+    new SubFlow(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
+
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, or the accumulated weight exceeds
+   * the `maxWeight`, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value , and the `costFn` must 
return a non-negative result for all inputs,
+   * otherwise the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result, or exceeds the `maxWeight`.
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
+      maxWeight: Long,
+      costFn: java.util.function.Function[Out, java.lang.Long])
+      : SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
+    new SubFlow(delegate.groupedAdjacentByWeighted(f.apply, 
maxWeight)(costFn.apply).map(_.asJava))
+
   /**
    * Ensure stream boundedness by limiting the number of elements from 
upstream.
    * If the number of incoming elements exceeds max, it will signal
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 3ab439ca9f..bc77cb5bee 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -669,6 +669,49 @@ class SubSource[Out, Mat](
       costFn: function.Function[Out, java.lang.Long]): 
SubSource[java.util.List[Out @uncheckedVariance], Mat] =
     new 
SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // 
TODO optimize to one step
 
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value for all elements, otherwise 
the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentBy[R](f: function.Function[Out, R]): 
SubSource[java.util.List[Out @uncheckedVariance], Mat] =
+    new SubSource(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
+
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, or the accumulated weight exceeds
+   * the `maxWeight`, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value , and the `costFn` must 
return a non-negative result for all inputs,
+   * otherwise the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result, or exceeds the `maxWeight`.
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
+      maxWeight: Long,
+      costFn: java.util.function.Function[Out, java.lang.Long])
+      : SubSource[java.util.List[Out @uncheckedVariance], Mat] =
+    new SubSource(delegate.groupedAdjacentByWeighted(f.apply, 
maxWeight)(costFn.apply).map(_.asJava))
+
   /**
    * Apply a sliding window over the stream and return the windows as groups 
of elements, with the last group
    * possibly smaller than requested due to end-of-stream.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index b1670f0e22..451b43ca0d 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1779,6 +1779,46 @@ trait FlowOps[+Out, +Mat] {
   def groupedWeighted(minWeight: Long)(costFn: Out => Long): 
Repr[immutable.Seq[Out]] =
     via(GroupedWeighted[Out](minWeight, costFn))
 
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value for all elements, otherwise 
the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentBy[T](f: Out => T): Repr[immutable.Seq[Out]] =
+    via(GroupedAdjacentByWeighted(f, Long.MaxValue, ConstantFun.oneLong))
+
+  /**
+   * Partitions this stream into chunks by a delimiter function, which is 
applied to each incoming element,
+   * when the result of the function is not the same as the previous element's 
result, or the accumulated weight exceeds
+   * the `maxWeight`, a chunk is emitted.
+   *
+   * The `f` function must return a non-null value , and the `costFn` must 
return a non-negative result for all inputs,
+   * otherwise the stage will fail.
+   *
+   * '''Emits when''' the delimiter function returns a different value than 
the previous element's result, or exceeds the `maxWeight`.
+   *
+   * '''Backpressures when''' a chunk has been assembled and downstream 
backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.2.0
+   */
+  def groupedAdjacentByWeighted[T](f: Out => T, maxWeight: Long)(costFn: Out 
=> Long): Repr[immutable.Seq[Out]] =
+    via(GroupedAdjacentByWeighted(f, maxWeight, costFn))
+
   /**
    * Ensure stream boundedness by limiting the number of elements from 
upstream.
    * If the number of incoming elements exceeds max, it will signal


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to