This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 8d2141a5c9 =str Deprecate statefulMapConcat operator.
8d2141a5c9 is described below
commit 8d2141a5c98c1c9ed34f2cd0c0e98be98aa13946
Author: He-Pin <[email protected]>
AuthorDate: Fri Sep 1 22:48:40 2023 +0800
=str Deprecate statefulMapConcat operator.
---
.../scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala | 3 +++
.../paradox/stream/operators/Source-or-Flow/statefulMapConcat.md | 9 +++++++++
.../scala/docs/stream/operators/flow/StatefulMapConcat.scala | 3 +++
.../test/scala/docs/stream/operators/sourceorflow/Split.scala | 3 +++
.../apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala | 3 +++
stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala | 5 +++++
.../src/main/scala/org/apache/pekko/stream/javadsl/Source.scala | 5 +++++
.../src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala | 5 +++++
.../main/scala/org/apache/pekko/stream/javadsl/SubSource.scala | 5 +++++
.../src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala | 5 +++++
10 files changed, 46 insertions(+)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala
index 3393e54d84..3736656773 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala
@@ -30,6 +30,8 @@ import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl._
+import scala.annotation.nowarn
+
object ZipWithIndexBenchmark {
final val OperationsPerInvocation = 100000
}
@@ -37,6 +39,7 @@ object ZipWithIndexBenchmark {
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
+@nowarn("msg=deprecated")
class ZipWithIndexBenchmark {
import ZipWithIndexBenchmark._
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
index 0eb9b8a1b3..3315fe9d5a 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
@@ -4,6 +4,15 @@ Transform each element into zero or more elements that are
individually passed d
@ref[Simple operators](../index.md#simple-operators)
+@@@ warning
+
+The `statefulMapConcat` operator has been deprecated.
+
+- for stateful mapping, use @ref:[statefulMap](./statefulMap.md)
+- for stateful map concat, use @ref:[statefulMap](./statefulMap.md) with
@ref:[mapConcat](./mapConcat.md).
+
+@@@
+
## Signature
@apidoc[Flow.statefulMapConcat](Flow) {
scala="#statefulMapConcat[T](f:()=>Out=>scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]"
java="#statefulMapConcat(org.apache.pekko.japi.function.Creator)" }
diff --git
a/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
b/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
index f22c38c6aa..ebb7da274d 100644
--- a/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
+++ b/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
@@ -17,6 +17,9 @@ import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Flow
import org.apache.pekko.stream.scaladsl.Source
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
class StatefulMapConcat {
implicit val system: ActorSystem = ???
diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
b/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
index 054ac79224..c4919925da 100644
--- a/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
+++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
@@ -22,6 +22,9 @@ import scala.concurrent.duration._
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
object Split {
def splitWhenExample(args: Array[String]): Unit = {
import org.apache.pekko.actor.ActorSystem
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
index ce86a47502..dc909475b3 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
@@ -21,6 +21,9 @@ import pekko.stream.Supervision
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.TestSink
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
class FlowStatefulMapConcatSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 2
""") with ScriptedTest {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 54b75b94e2..3d10ab4953 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -784,6 +784,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In,
Out, Mat]) extends Gr
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -796,6 +799,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In,
Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+ @Deprecated
def statefulMapConcat[T](
f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):
javadsl.Flow[In, T, Mat] =
new Flow(delegate.statefulMapConcat { () =>
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 0156d0a60c..a8fe667767 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2437,6 +2437,9 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -2449,6 +2452,8 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+ @Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out,
java.lang.Iterable[T]]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat { () =>
val fun = f.create()
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index e6e1997208..b9c7d089ab 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -258,6 +258,9 @@ class SubFlow[In, Out, Mat](
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -270,6 +273,8 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+ @Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out,
java.lang.Iterable[T]]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat { () =>
val fun = f.create()
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 43ba6d6940..0de91a896a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -249,6 +249,9 @@ class SubSource[Out, Mat](
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -261,6 +264,8 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+ @Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out,
java.lang.Iterable[T]]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat { () =>
val fun = f.create()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 7c6c47b17c..1642248639 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1008,6 +1008,7 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
+ @nowarn("msg=deprecated")
def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(()
=> f)
/**
@@ -1053,6 +1054,9 @@ trait FlowOps[+Out, +Mat] {
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -1067,6 +1071,7 @@ trait FlowOps[+Out, +Mat] {
*
* See also [[FlowOps.mapConcat]]
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
def statefulMapConcat[T](f: () => Out => IterableOnce[T]): Repr[T] =
via(new StatefulMapConcat(f))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]