This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new bd3270cd3e +str Add Flow contramap.
bd3270cd3e is described below

commit bd3270cd3e0d8f77e19a93ddd6dbe6a3bdea89f8
Author: He-Pin <[email protected]>
AuthorDate: Fri Aug 11 00:59:31 2023 +0800

    +str Add Flow contramap.
    
    Signed-off-by: He-Pin <[email protected]>
---
 .../paradox/stream/operators/Flow/contramap.md     | 33 ++++++++++++++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 ++
 .../docs/stream/operators/flow/ContraMap.scala     | 32 +++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/FlowTest.java  | 16 ++++++++++-
 .../apache/pekko/stream/scaladsl/FlowSpec.scala    |  8 ++++++
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 11 ++++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    | 11 ++++++++
 7 files changed, 112 insertions(+), 1 deletion(-)

diff --git a/docs/src/main/paradox/stream/operators/Flow/contramap.md 
b/docs/src/main/paradox/stream/operators/Flow/contramap.md
new file mode 100644
index 0000000000..fae21bff89
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Flow/contramap.md
@@ -0,0 +1,33 @@
+# contramap
+
+Transform this Flow by applying a function to each *incoming* upstream element 
before it is passed to the Flow.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Flow.contramap](Flow) { scala="#contramap[In2](f:In2=&gt;In):Flow[In2, 
Out, Mat]" java="#map(
+org.apache.pekko.japi.function.Function)" }
+
+## Description
+
+Transform this Flow by applying a function to each *incoming* upstream element 
before it is passed to the Flow.
+
+## Examples
+
+Scala
+:  @@snip 
[Flow.scala](/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala) { 
#imports #contramap }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the mapping function returns an element
+
+**backpressures** '''Backpressures when''' original flow backpressures
+
+**completes** when upstream completes
+
+**cancels** when original flow cancels
+
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 69712e037e..e6cb416cdc 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -146,6 +146,7 @@ depending on being backpressured by downstream or not.
 |Source/Flow|<a 
name="collect"></a>@ref[collect](Source-or-Flow/collect.md)|Apply a partial 
function to each incoming element, if the partial function is defined for a 
value the returned value is passed downstream.|
 |Source/Flow|<a 
name="collecttype"></a>@ref[collectType](Source-or-Flow/collectType.md)|Transform
 this stream by testing the type of each of the elements on which the element 
is an instance of the provided type as they pass through this processing step.|
 |Flow|<a 
name="completionstageflow"></a>@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams
 the elements through the given future flow once it successfully completes.|
+|Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform 
this Flow by applying a function to each *incoming* upstream element before it 
is passed to the Flow.|
 |Source/Flow|<a 
name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand 
from downstream demand without detaching the stream rates.|
 |Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n` 
elements and then pass any subsequent element downstream.|
 |Source/Flow|<a 
name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements 
as long as a predicate function return true for the element|
@@ -419,6 +420,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [concatLazy](Source-or-Flow/concatLazy.md)
 * [conflate](Source-or-Flow/conflate.md)
 * [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
+* [contramap](Flow/contramap.md)
 * [cycle](Source/cycle.md)
 * [deflate](Compression/deflate.md)
 * [delay](Source-or-Flow/delay.md)
diff --git a/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala 
b/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala
new file mode 100644
index 0000000000..5589599ebc
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators.flow
+
+//#imports
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.scaladsl._
+
+//#imports
+
+object ContraMap {
+
+  // #contramap
+  val flow: Flow[Int, Int, NotUsed] = Flow[Int]
+  val newFlow: Flow[String, Int, NotUsed] = flow.contramap(_.toInt)
+  // #contramap
+}
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index cf8c68dc63..5df754d409 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -120,6 +120,20 @@ public class FlowTest extends StreamTest {
     probe.expectMsgEquals("de");
   }
 
+  @Test
+  public void mustBeAbleToUseContraMap() {
+    final Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2", 
"3"));
+    final Flow<Integer, String, NotUsed> flow = 
Flow.fromFunction(String::valueOf);
+    source
+        .via(flow.contramap(Integer::valueOf))
+        .runWith(TestSink.create(system), system)
+        .request(3)
+        .expectNext("1")
+        .expectNext("2")
+        .expectNext("3")
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToUseDropWhile() throws Exception {
     final TestKit probe = new TestKit(system);
@@ -881,7 +895,7 @@ public class FlowTest extends StreamTest {
     Source.from(input)
         .via(Flow.of(FlowSpec.Fruit.class).collectType(FlowSpec.Apple.class))
         .runForeach((apple) -> probe.getRef().tell(apple, 
ActorRef.noSender()), system);
-    probe.expectMsgAnyClassOf(FlowSpec.Apple.class);
+    probe.<Apple>expectMsgAnyClassOf(FlowSpec.Apple.class);
   }
 
   @Test
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
index 041b874190..c1d71cb70f 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
@@ -207,6 +207,14 @@ class FlowSpec extends 
StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
       c1.expectComplete()
     }
 
+    "perform contramap operation" in {
+      val flow = Flow[Int].contramap(Integer.parseInt)
+      val sub = Source(List("1", "2", "3")).via(flow).runWith(TestSink())
+      sub.request(3)
+      sub.expectNextN(List(1, 2, 3))
+      sub.expectComplete()
+    }
+
     "perform transformation operation" in {
       val flow = Flow[Int].map(i => { testActor ! i.toString; i.toString })
 
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1a3ed4683e..54b75b94e2 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -532,6 +532,17 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, 
Out, Mat]) extends Gr
   def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: 
function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
     new Sink(delegate.toMat(sink)(combinerToScala(combine)))
 
+  /**
+   * Transform this Flow by applying a function to each *incoming* upstream 
element before
+   * it is passed to the [[Flow]]
+   *
+   * '''Backpressures when''' original [[Flow]] backpressures
+   *
+   * '''Cancels when''' original [[Flow]] cancels
+   */
+  def contramap[In2](f: function.Function[In2, In]): javadsl.Flow[In2, Out, 
Mat] =
+    new Flow(delegate.contramap(elem => f(elem)))
+
   /**
    * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs 
and outputs, creating a [[RunnableGraph]].
    * {{{
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index e3767bfba0..c3e2fcfe56 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -173,6 +173,17 @@ final class Flow[-In, +Out, +Mat](
     (mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), 
Source.fromPublisher(pub)))
   }
 
+  /**
+   * Transform this Flow by applying a function to each *incoming* upstream 
element before
+   * it is passed to the [[Flow]]
+   *
+   * '''Backpressures when''' original [[Flow]] backpressures
+   *
+   * '''Cancels when''' original [[Flow]] cancels
+   */
+  def contramap[In2](f: In2 => In): Flow[In2, Out, Mat] =
+    Flow.fromFunction(f).viaMat(this)(Keep.right)
+
   /**
    * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs 
and outputs, creating a [[RunnableGraph]].
    * {{{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to