This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.0.x
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/1.0.x by this push:
     new 2ce8a894e9 =str Deprecate `statefulMapConcat` operator.
2ce8a894e9 is described below

commit 2ce8a894e9f36338e85313518ea25170e59badc8
Author: He-Pin <[email protected]>
AuthorDate: Fri Sep 1 12:54:08 2023 +0800

    =str Deprecate `statefulMapConcat` operator.
---
 .../cluster/sharding/passivation/simulator/Simulator.scala     |  3 +++
 .../stream/operators/Source-or-Flow/statefulMapConcat.md       |  9 +++++++++
 .../scala/docs/stream/operators/flow/StatefulMapConcat.scala   |  3 +++
 .../test/scala/docs/stream/operators/sourceorflow/Split.scala  |  3 +++
 .../state/scaladsl/PersistenceTestKitDurableStateStore.scala   |  4 ++++
 .../apache/pekko/remote/artery/SystemMessageDeliverySpec.scala |  3 +++
 .../pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala      |  3 +++
 .../apache/pekko/stream/scaladsl/WithContextUsageSpec.scala    |  4 ++++
 .../src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala  |  5 +++++
 .../main/scala/org/apache/pekko/stream/javadsl/Source.scala    |  5 +++++
 .../main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala   |  5 +++++
 .../main/scala/org/apache/pekko/stream/javadsl/SubSource.scala |  5 +++++
 .../src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala | 10 +++++++---
 13 files changed, 59 insertions(+), 3 deletions(-)

diff --git 
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
 
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
index cfbea67407..4d7c470c40 100644
--- 
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
+++ 
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
@@ -40,6 +40,7 @@ import pekko.stream.scaladsl.Source
 import pekko.util.OptionVal
 import com.typesafe.config.ConfigFactory
 
+import scala.annotation.nowarn
 import scala.collection.immutable
 import scala.collection.mutable
 import scala.concurrent.ExecutionContext
@@ -201,6 +202,7 @@ object Simulator {
       .runWith(SimulatorStats())
 
   object ShardAllocation {
+    @nowarn("msg=deprecated")
     def apply(numberOfShards: Int, numberOfRegions: Int): Flow[EntityId, 
Access, NotUsed] =
       Flow[EntityId].statefulMapConcat(() => {
         val allocation = new ShardAllocation(numberOfShards, numberOfRegions)
@@ -235,6 +237,7 @@ object Simulator {
   }
 
   object ShardingState {
+    @nowarn("msg=deprecated")
     def apply(strategyCreator: StrategyCreator): Flow[Access, Event, NotUsed] =
       Flow[Access].statefulMapConcat(() => {
         val state = new ShardingState(strategyCreator)
diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
index 0eb9b8a1b3..3315fe9d5a 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md
@@ -4,6 +4,15 @@ Transform each element into zero or more elements that are 
individually passed d
 
 @ref[Simple operators](../index.md#simple-operators)
 
+@@@ warning
+
+The `statefulMapConcat` operator has been deprecated.
+
+- for stateful mapping, use @ref:[statefulMap](./statefulMap.md)
+- for stateful map concat, use @ref:[statefulMap](./statefulMap.md) with 
@ref:[mapConcat](./mapConcat.md).
+
+@@@
+
 ## Signature
 
 @apidoc[Flow.statefulMapConcat](Flow) { 
scala="#statefulMapConcat[T](f:()=&gt;Out=&gt;scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]"
 java="#statefulMapConcat(org.apache.pekko.japi.function.Creator)" } 
diff --git 
a/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala 
b/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
index f22c38c6aa..ebb7da274d 100644
--- a/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
+++ b/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala
@@ -17,6 +17,9 @@ import org.apache.pekko.actor.ActorSystem
 import org.apache.pekko.stream.scaladsl.Flow
 import org.apache.pekko.stream.scaladsl.Source
 
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
 class StatefulMapConcat {
 
   implicit val system: ActorSystem = ???
diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala 
b/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
index 054ac79224..c4919925da 100644
--- a/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
+++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala
@@ -22,6 +22,9 @@ import scala.concurrent.duration._
 import org.apache.pekko.stream.scaladsl.Sink
 import org.apache.pekko.stream.scaladsl.Source
 
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
 object Split {
   def splitWhenExample(args: Array[String]): Unit = {
     import org.apache.pekko.actor.ActorSystem
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
index d53531ec88..17347656fd 100644
--- 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
@@ -38,6 +38,8 @@ import pekko.stream.scaladsl.Keep
 import pekko.stream.scaladsl.Source
 import pekko.stream.typed.scaladsl.ActorSource
 import pekko.stream.OverflowStrategy
+
+import scala.annotation.nowarn
 import scala.collection.immutable
 
 object PersistenceTestKitDurableStateStore {
@@ -95,6 +97,7 @@ class PersistenceTestKitDurableStateStore[A](val system: 
ExtendedActorSystem)
     store.contains(persistenceId)
   }
 
+  @nowarn("msg=deprecated")
   override def changes(tag: String, offset: Offset): 
Source[DurableStateChange[A], pekko.NotUsed] = this.synchronized {
     val fromOffset = offset match {
       case NoOffset             => EarliestOffset
@@ -147,6 +150,7 @@ class PersistenceTestKitDurableStateStore[A](val system: 
ExtendedActorSystem)
         }, inclusive = true)
     }
 
+  @nowarn("msg=deprecated")
   override def changesBySlices(
       entityType: String,
       minSlice: Int,
diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
 
b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
index 2bcc8dd0a6..595f7dab3b 100644
--- 
a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
+++ 
b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
@@ -43,6 +43,8 @@ import pekko.testkit.TestEvent
 import pekko.testkit.TestProbe
 import pekko.util.OptionVal
 
+import scala.annotation.nowarn
+
 object SystemMessageDeliverySpec {
 
   case class TestSysMsg(s: String) extends 
SystemMessageDelivery.AckedDeliveryMessage
@@ -98,6 +100,7 @@ abstract class AbstractSystemMessageDeliverySpec(c: Config) 
extends ArteryMultiN
       .via(new SystemMessageAcker(inboundContext))
   }
 
+  @nowarn("msg=deprecated")
   protected def drop(dropSeqNumbers: Vector[Long]): Flow[OutboundEnvelope, 
OutboundEnvelope, NotUsed] = {
     Flow[OutboundEnvelope].statefulMapConcat(() => {
       var dropping = dropSeqNumbers
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
index ce86a47502..dc909475b3 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala
@@ -21,6 +21,9 @@ import pekko.stream.Supervision
 import pekko.stream.testkit._
 import pekko.stream.testkit.scaladsl.TestSink
 
+import scala.annotation.nowarn
+
+@nowarn("msg=deprecated")
 class FlowStatefulMapConcatSpec extends StreamSpec("""
     pekko.stream.materializer.initial-input-buffer-size = 2
   """) with ScriptedTest {
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/WithContextUsageSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/WithContextUsageSpec.scala
index 92238de7df..32d73876e4 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/WithContextUsageSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/WithContextUsageSpec.scala
@@ -21,6 +21,8 @@ import pekko.stream.testkit.StreamSpec
 import pekko.stream.testkit.TestSubscriber.Probe
 import pekko.stream.testkit.scaladsl.TestSink
 
+import scala.annotation.nowarn
+
 class WithContextUsageSpec extends StreamSpec {
 
   "Context propagation used for committing offsets" must {
@@ -181,6 +183,8 @@ class WithContextUsageSpec extends StreamSpec {
       .map(_.record)
 
   def commitOffsets = commit[Offset](Offset.Uninitialized)
+
+  @nowarn("msg=deprecated")
   def commit[Ctx](uninitialized: Ctx): Sink[Ctx, Probe[Ctx]] = {
     val testSink = TestSink.probe[Ctx]
     Flow[Ctx]
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1a3ed4683e..1314c7b594 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -773,6 +773,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, 
Out, Mat]) extends Gr
    * The returned `Iterable` MUST NOT contain `null` values,
    * as they are illegal as stream elements - according to the Reactive 
Streams specification.
    *
+   * This operator doesn't handle upstream's completion signal since the state 
kept in the closure can be lost.
+   * Use [[FlowOps.statefulMap]] instead.
+   *
    * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
    *
    * '''Emits when''' the mapping function returns an element or there are 
still remaining elements
@@ -785,6 +788,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, 
Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
+  @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+  @Deprecated
   def statefulMapConcat[T](
       f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): 
javadsl.Flow[In, T, Mat] =
     new Flow(delegate.statefulMapConcat { () =>
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index d674c1a56f..65b0cbd29a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2443,6 +2443,9 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * The returned `Iterable` MUST NOT contain `null` values,
    * as they are illegal as stream elements - according to the Reactive 
Streams specification.
    *
+   * This operator doesn't handle upstream's completion signal since the state 
kept in the closure can be lost.
+   * Use [[FlowOps.statefulMap]] instead.
+   *
    * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
    *
    * '''Emits when''' the mapping function returns an element or there are 
still remaining elements
@@ -2455,6 +2458,8 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
+  @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+  @Deprecated
   def statefulMapConcat[T](f: function.Creator[function.Function[Out, 
java.lang.Iterable[T]]]): javadsl.Source[T, Mat] =
     new Source(delegate.statefulMapConcat { () =>
       val fun = f.create()
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index e6e1997208..b9c7d089ab 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -258,6 +258,9 @@ class SubFlow[In, Out, Mat](
    * The returned `Iterable` MUST NOT contain `null` values,
    * as they are illegal as stream elements - according to the Reactive 
Streams specification.
    *
+   * This operator doesn't handle upstream's completion signal since the state 
kept in the closure can be lost.
+   * Use [[FlowOps.statefulMap]] instead.
+   *
    * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
    *
    * '''Emits when''' the mapping function returns an element or there are 
still remaining elements
@@ -270,6 +273,8 @@ class SubFlow[In, Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
+  @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+  @Deprecated
   def statefulMapConcat[T](f: function.Creator[function.Function[Out, 
java.lang.Iterable[T]]]): SubFlow[In, T, Mat] =
     new SubFlow(delegate.statefulMapConcat { () =>
       val fun = f.create()
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 43ba6d6940..0de91a896a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -249,6 +249,9 @@ class SubSource[Out, Mat](
    * The returned `Iterable` MUST NOT contain `null` values,
    * as they are illegal as stream elements - according to the Reactive 
Streams specification.
    *
+   * This operator doesn't handle upstream's completion signal since the state 
kept in the closure can be lost.
+   * Use [[FlowOps.statefulMap]] instead.
+   *
    * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
    *
    * '''Emits when''' the mapping function returns an element or there are 
still remaining elements
@@ -261,6 +264,8 @@ class SubSource[Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
+  @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
+  @Deprecated
   def statefulMapConcat[T](f: function.Creator[function.Function[Out, 
java.lang.Iterable[T]]]): SubSource[T, Mat] =
     new SubSource(delegate.statefulMapConcat { () =>
       val fun = f.create()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index e3767bfba0..d6f3bb9275 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -13,14 +13,12 @@
 
 package org.apache.pekko.stream.scaladsl
 
-import scala.annotation.implicitNotFound
-import scala.annotation.nowarn
+import scala.annotation.{ implicitNotFound, nowarn }
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.immutable
 import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
 import scala.reflect.ClassTag
-
 import org.reactivestreams.Processor
 import org.reactivestreams.Publisher
 import org.reactivestreams.Subscriber
@@ -997,6 +995,7 @@ trait FlowOps[+Out, +Mat] {
    *
    * '''Cancels when''' downstream cancels
    */
+  @nowarn("msg=deprecated")
   def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() 
=> f)
 
   /**
@@ -1042,6 +1041,9 @@ trait FlowOps[+Out, +Mat] {
    * The returned `Iterable` MUST NOT contain `null` values,
    * as they are illegal as stream elements - according to the Reactive 
Streams specification.
    *
+   * This operator doesn't handle upstream's completion signal since the state 
kept in the closure can be lost.
+   * Use [[FlowOps.statefulMap]] instead.
+   *
    * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
    *
    * '''Emits when''' the mapping function returns an element or there are 
still remaining elements
@@ -1056,6 +1058,7 @@ trait FlowOps[+Out, +Mat] {
    *
    * See also [[FlowOps.mapConcat]]
    */
+  @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
   def statefulMapConcat[T](f: () => Out => IterableOnce[T]): Repr[T] =
     via(new StatefulMapConcat(f))
 
@@ -2870,6 +2873,7 @@ trait FlowOps[+Out, +Mat] {
    *
    * '''Cancels when''' downstream cancels
    */
+  @nowarn("msg=deprecated")
   def zipWithIndex: Repr[(Out, Long)] = {
     statefulMapConcat[(Out, Long)] { () =>
       var index: Long = 0L


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to