This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch 1.0.x
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/1.0.x by this push:
new 2ce8a894e9 =str Deprecate `statefulMapConcat` operator.
2ce8a894e9 is described below
commit 2ce8a894e9f36338e85313518ea25170e59badc8
Author: He-Pin <[email protected]>
AuthorDate: Fri Sep 1 12:54:08 2023 +0800
=str Deprecate `statefulMapConcat` operator.
---
.../cluster/sharding/passivation/simulator/Simulator.scala | 3 +++
.../stream/operators/Source-or-Flow/statefulMapConcat.md | 9 +++++++++
.../scala/docs/stream/operators/flow/StatefulMapConcat.scala | 3 +++
.../test/scala/docs/stream/operators/sourceorflow/Split.scala | 3 +++
.../state/scaladsl/PersistenceTestKitDurableStateStore.scala | 4 ++++
.../apache/pekko/remote/artery/SystemMessageDeliverySpec.scala | 3 +++
.../pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala | 3 +++
.../apache/pekko/stream/scaladsl/WithContextUsageSpec.scala | 4 ++++
.../src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala | 5 +++++
.../main/scala/org/apache/pekko/stream/javadsl/Source.scala | 5 +++++
.../main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala | 5 +++++
.../main/scala/org/apache/pekko/stream/javadsl/SubSource.scala | 5 +++++
.../src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala | 10 +++++++---
13 files changed, 59 insertions(+), 3 deletions(-)
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
index cfbea67407..4d7c470c40 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
@@ -40,6 +40,7 @@ import pekko.stream.scaladsl.Source
import pekko.util.OptionVal
import com.typesafe.config.ConfigFactory
+import scala.annotation.nowarn
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.ExecutionContext
@@ -201,6 +202,7 @@ object Simulator {
.runWith(SimulatorStats())
object ShardAllocation {
+ @nowarn("msg=deprecated")
def apply(numberOfShards: Int, numberOfRegions: Int): Flow[EntityId,
Access, NotUsed] =
Flow[EntityId].statefulMapConcat(() => {
val allocation = new ShardAllocation(numberOfShards, numberOfRegions)
@@ -235,6 +237,7 @@ object Simulator {
}
object ShardingState {
+ @nowarn("msg=deprecated")
def apply(strategyCreator: StrategyCreator): Flow[Access, Event, NotUsed] =
Flow[Access].statefulMapConcat(() => {
val state = new ShardingState(strategyCreator)
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
index 0eb9b8a1b3..3315fe9d5a 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
@@ -4,6 +4,15 @@ Transform each element into zero or more elements that are
individually passed d
@ref[Simple operators](../index.md#simple-operators)
+@@@ warning
+
+The `statefulMapConcat` operator has been deprecated.
+
+- for stateful mapping, use @ref:[statefulMap](./statefulMap.md)
+- for stateful map concat, use @ref:[statefulMap](./statefulMap.md) with
@ref:[mapConcat](./mapConcat.md).
+
+@@@
+
## Signature
@apidoc[Flow.statefulMapConcat](Flow) {
scala="#statefulMapConcat[T](f:()=>Out=>scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]"
java="#statefulMapConcat(org.apache.pekko.japi.function.Creator)" }
diff --git
a/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
b/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
index f22c38c6aa..ebb7da274d 100644
--- a/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
+++ b/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
@@ -17,6 +17,9 @@ import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Flow
import org.apache.pekko.stream.scaladsl.Source
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
class StatefulMapConcat {
implicit val system: ActorSystem = ???
diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
b/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
index 054ac79224..c4919925da 100644
--- a/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
+++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
@@ -22,6 +22,9 @@ import scala.concurrent.duration._
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
object Split {
def splitWhenExample(args: Array[String]): Unit = {
import org.apache.pekko.actor.ActorSystem
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
index d53531ec88..17347656fd 100644
---
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
@@ -38,6 +38,8 @@ import pekko.stream.scaladsl.Keep
import pekko.stream.scaladsl.Source
import pekko.stream.typed.scaladsl.ActorSource
import pekko.stream.OverflowStrategy
+
+import scala.annotation.nowarn
import scala.collection.immutable
object PersistenceTestKitDurableStateStore {
@@ -95,6 +97,7 @@ class PersistenceTestKitDurableStateStore[A](val system:
ExtendedActorSystem)
store.contains(persistenceId)
}
+ @nowarn("msg=deprecated")
override def changes(tag: String, offset: Offset):
Source[DurableStateChange[A], pekko.NotUsed] = this.synchronized {
val fromOffset = offset match {
case NoOffset => EarliestOffset
@@ -147,6 +150,7 @@ class PersistenceTestKitDurableStateStore[A](val system:
ExtendedActorSystem)
}, inclusive = true)
}
+ @nowarn("msg=deprecated")
override def changesBySlices(
entityType: String,
minSlice: Int,
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
index 2bcc8dd0a6..595f7dab3b 100644
---
a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
+++
b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
@@ -43,6 +43,8 @@ import pekko.testkit.TestEvent
import pekko.testkit.TestProbe
import pekko.util.OptionVal
+import scala.annotation.nowarn
+
object SystemMessageDeliverySpec {
case class TestSysMsg(s: String) extends
SystemMessageDelivery.AckedDeliveryMessage
@@ -98,6 +100,7 @@ abstract class AbstractSystemMessageDeliverySpec(c: Config)
extends ArteryMultiN
.via(new SystemMessageAcker(inboundContext))
}
+ @nowarn("msg=deprecated")
protected def drop(dropSeqNumbers: Vector[Long]): Flow[OutboundEnvelope,
OutboundEnvelope, NotUsed] = {
Flow[OutboundEnvelope].statefulMapConcat(() => {
var dropping = dropSeqNumbers
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
index ce86a47502..dc909475b3 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
@@ -21,6 +21,9 @@ import pekko.stream.Supervision
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.TestSink
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
class FlowStatefulMapConcatSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 2
""") with ScriptedTest {
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/WithContextUsageSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/WithContextUsageSpec.scala
index 92238de7df..32d73876e4 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/WithContextUsageSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/WithContextUsageSpec.scala
@@ -21,6 +21,8 @@ import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.TestSubscriber.Probe
import pekko.stream.testkit.scaladsl.TestSink
+import scala.annotation.nowarn
+
class WithContextUsageSpec extends StreamSpec {
"Context propagation used for committing offsets" must {
@@ -181,6 +183,8 @@ class WithContextUsageSpec extends StreamSpec {
.map(_.record)
def commitOffsets = commit[Offset](Offset.Uninitialized)
+
+ @nowarn("msg=deprecated")
def commit[Ctx](uninitialized: Ctx): Sink[Ctx, Probe[Ctx]] = {
val testSink = TestSink.probe[Ctx]
Flow[Ctx]
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1a3ed4683e..1314c7b594 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -773,6 +773,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In,
Out, Mat]) extends Gr
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -785,6 +788,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In,
Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+ @Deprecated
def statefulMapConcat[T](
f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):
javadsl.Flow[In, T, Mat] =
new Flow(delegate.statefulMapConcat { () =>
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index d674c1a56f..65b0cbd29a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2443,6 +2443,9 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -2455,6 +2458,8 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+ @Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out,
java.lang.Iterable[T]]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat { () =>
val fun = f.create()
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index e6e1997208..b9c7d089ab 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -258,6 +258,9 @@ class SubFlow[In, Out, Mat](
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -270,6 +273,8 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+ @Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out,
java.lang.Iterable[T]]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat { () =>
val fun = f.create()
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 43ba6d6940..0de91a896a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -249,6 +249,9 @@ class SubSource[Out, Mat](
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -261,6 +264,8 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+ @Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out,
java.lang.Iterable[T]]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat { () =>
val fun = f.create()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index e3767bfba0..d6f3bb9275 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -13,14 +13,12 @@
package org.apache.pekko.stream.scaladsl
-import scala.annotation.implicitNotFound
-import scala.annotation.nowarn
+import scala.annotation.{ implicitNotFound, nowarn }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
-
import org.reactivestreams.Processor
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
@@ -997,6 +995,7 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
+ @nowarn("msg=deprecated")
def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(()
=> f)
/**
@@ -1042,6 +1041,9 @@ trait FlowOps[+Out, +Mat] {
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive
Streams specification.
*
+ * This operator doesn't handle upstream's completion signal since the state
kept in the closure can be lost.
+ * Use [[FlowOps.statefulMap]] instead.
+ *
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are
still remaining elements
@@ -1056,6 +1058,7 @@ trait FlowOps[+Out, +Mat] {
*
* See also [[FlowOps.mapConcat]]
*/
+ @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
def statefulMapConcat[T](f: () => Out => IterableOnce[T]): Repr[T] =
via(new StatefulMapConcat(f))
@@ -2870,6 +2873,7 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
+ @nowarn("msg=deprecated")
def zipWithIndex: Repr[(Out, Long)] = {
statefulMapConcat[(Out, Long)] { () =>
var index: Long = 0L
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]