This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new c8ac6c06b1 chore: Make flatMapPrefix javadsl using java.util.List
(#271)
c8ac6c06b1 is described below
commit c8ac6c06b1959135c95427fc5b74a7060785f275
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Jan 4 23:14:09 2025 +0800
chore: Make flatMapPrefix javadsl using java.util.List (#271)
---
.../org/apache/pekko/stream/javadsl/FlowTest.java | 30 ++++++++++++++++++++++
.../apache/pekko/stream/javadsl/SourceTest.java | 25 ++++++++++++++++++
...ist-in-flatmapPrefix-javadsl.backwards.excludes | 24 +++++++++++++++++
.../org/apache/pekko/stream/javadsl/Flow.scala | 4 +--
.../org/apache/pekko/stream/javadsl/Source.scala | 4 +--
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 2 +-
.../apache/pekko/stream/javadsl/SubSource.scala | 2 +-
7 files changed, 85 insertions(+), 6 deletions(-)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index f2e92c8863..70a6aa98c6 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -13,6 +13,7 @@
package org.apache.pekko.stream.javadsl;
+import com.google.common.collect.Sets;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
@@ -1663,4 +1664,33 @@ public class FlowTest extends StreamTest {
org.apache.pekko.stream.scaladsl.Flow.apply();
Flow<Integer, Integer, NotUsed> javaFlow = scalaFlow.asJava();
}
+
+ @Test
+ public void useFlatMapPrefix() {
+ final List<Integer> resultList =
+ Source.range(1, 2)
+ .via(
+ Flow.of(Integer.class)
+ .flatMapPrefix(
+ 1, prefix ->
Flow.of(Integer.class).prepend(Source.from(prefix))))
+ .runWith(Sink.seq(), system)
+ .toCompletableFuture()
+ .join();
+ Assert.assertEquals(Arrays.asList(1, 2), resultList);
+ }
+
+ @Test
+ public void useFlatMapPrefixSubSource() {
+ final Set<Integer> resultSet =
+ Source.range(1, 2)
+ .via(
+ Flow.of(Integer.class)
+ .groupBy(2, i -> i % 2)
+ .flatMapPrefix(1, prefix ->
Flow.of(Integer.class).prepend(Source.from(prefix)))
+ .mergeSubstreams())
+ .runWith(Sink.collect(Collectors.toSet()), system)
+ .toCompletableFuture()
+ .join();
+ Assert.assertEquals(Sets.newHashSet(1, 2), resultSet);
+ }
}
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 8464688043..e7c37e55a3 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -13,6 +13,7 @@
package org.apache.pekko.stream.javadsl;
+import com.google.common.collect.Sets;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
@@ -1504,4 +1505,28 @@ public class SourceTest extends StreamTest {
.get(3, TimeUnit.SECONDS);
Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList);
}
+
+ @Test
+ public void useFlatMapPrefix() {
+ final List<Integer> resultList =
+ Source.range(1, 2)
+ .flatMapPrefix(1, prefix ->
Flow.of(Integer.class).prepend(Source.from(prefix)))
+ .runWith(Sink.seq(), system)
+ .toCompletableFuture()
+ .join();
+ Assert.assertEquals(Arrays.asList(1, 2), resultList);
+ }
+
+ @Test
+ public void useFlatMapPrefixSubSource() {
+ final Set<Integer> resultSet =
+ Source.range(1, 2)
+ .groupBy(2, i -> i % 2)
+ .flatMapPrefix(1, prefix ->
Flow.of(Integer.class).prepend(Source.from(prefix)))
+ .mergeSubstreams()
+ .runWith(Sink.collect(Collectors.toSet()), system)
+ .toCompletableFuture()
+ .join();
+ Assert.assertEquals(Sets.newHashSet(1, 2), resultSet);
+ }
}
diff --git
a/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-271-use-java-list-in-flatmapPrefix-javadsl.backwards.excludes
b/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-271-use-java-list-in-flatmapPrefix-javadsl.backwards.excludes
new file mode 100644
index 0000000000..be512ed2b8
--- /dev/null
+++
b/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-271-use-java-list-in-flatmapPrefix-javadsl.backwards.excludes
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Make flatMapPrefix javadsl using java.util.List
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Flow.flatMapPrefix")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Flow.flatMapPrefixMat")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Source.flatMapPrefix")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Source.flatMapPrefixMat")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.SubFlow.flatMapPrefix")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.SubSource.flatMapPrefix")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 91b0385e49..df395c1ad5 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -2500,7 +2500,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def flatMapPrefix[Out2, Mat2](
n: Int,
- f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2,
Mat2]]): javadsl.Flow[In, Out2, Mat] = {
+ f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2,
Mat2]]): javadsl.Flow[In, Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.Flow(newDelegate)
}
@@ -2511,7 +2511,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def flatMapPrefixMat[Out2, Mat2, Mat3](
n: Int,
- f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2,
Mat2]],
+ f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]],
matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]):
javadsl.Flow[In, Out2, Mat3] = {
val newDelegate = delegate.flatMapPrefixMat(n)(seq =>
f(seq.asJava).asScala) { (m1, fm2) =>
matF(m1, fm2.asJava)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index bcd289b785..1f50fea255 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3980,7 +3980,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*/
def flatMapPrefix[Out2, Mat2](
n: Int,
- f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2,
Mat2]]): javadsl.Source[Out2, Mat] = {
+ f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2,
Mat2]]): javadsl.Source[Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.Source(newDelegate)
}
@@ -3991,7 +3991,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*/
def flatMapPrefixMat[Out2, Mat2, Mat3](
n: Int,
- f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2,
Mat2]],
+ f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]],
matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]):
javadsl.Source[Out2, Mat3] = {
val newDelegate = delegate.flatMapPrefixMat(n)(seq =>
f(seq.asJava).asScala) { (m1, fm2) =>
matF(m1, fm2.asJava)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 20f73c4f01..37216e02d6 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1761,7 +1761,7 @@ class SubFlow[In, Out, Mat](
*/
def flatMapPrefix[Out2, Mat2](
n: Int,
- f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2,
Mat2]]): SubFlow[In, Out2, Mat] = {
+ f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2,
Mat2]]): SubFlow[In, Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.SubFlow(newDelegate)
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 9ac6f9c714..89e676d958 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1735,7 +1735,7 @@ class SubSource[Out, Mat](
*/
def flatMapPrefix[Out2, Mat2](
n: Int,
- f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2,
Mat2]]): javadsl.SubSource[Out2, Mat] = {
+ f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2,
Mat2]]): javadsl.SubSource[Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.SubSource(newDelegate)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]