This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new bd3270cd3e +str Add Flow contramap.
bd3270cd3e is described below
commit bd3270cd3e0d8f77e19a93ddd6dbe6a3bdea89f8
Author: He-Pin <[email protected]>
AuthorDate: Fri Aug 11 00:59:31 2023 +0800
+str Add Flow contramap.
Signed-off-by: He-Pin <[email protected]>
---
.../paradox/stream/operators/Flow/contramap.md | 33 ++++++++++++++++++++++
docs/src/main/paradox/stream/operators/index.md | 2 ++
.../docs/stream/operators/flow/ContraMap.scala | 32 +++++++++++++++++++++
.../org/apache/pekko/stream/javadsl/FlowTest.java | 16 ++++++++++-
.../apache/pekko/stream/scaladsl/FlowSpec.scala | 8 ++++++
.../org/apache/pekko/stream/javadsl/Flow.scala | 11 ++++++++
.../org/apache/pekko/stream/scaladsl/Flow.scala | 11 ++++++++
7 files changed, 112 insertions(+), 1 deletion(-)
diff --git a/docs/src/main/paradox/stream/operators/Flow/contramap.md
b/docs/src/main/paradox/stream/operators/Flow/contramap.md
new file mode 100644
index 0000000000..fae21bff89
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Flow/contramap.md
@@ -0,0 +1,33 @@
+# contramap
+
+Transform this Flow by applying a function to each *incoming* upstream element
before it is passed to the Flow.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Flow.contramap](Flow) { scala="#contramap[In2](f:In2=>In):Flow[In2,
Out, Mat]" java="#map(
+org.apache.pekko.japi.function.Function)" }
+
+## Description
+
+Transform this Flow by applying a function to each *incoming* upstream element
before it is passed to the Flow.
+
+## Examples
+
+Scala
+: @@snip
[Flow.scala](/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala) {
#imports #contramap }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the mapping function returns an element
+
+**backpressures** '''Backpressures when''' original flow backpressures
+
+**completes** when upstream completes
+
+**cancels** when original flow cancels
+
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index 69712e037e..e6cb416cdc 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -146,6 +146,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a
name="collect"></a>@ref[collect](Source-or-Flow/collect.md)|Apply a partial
function to each incoming element, if the partial function is defined for a
value the returned value is passed downstream.|
|Source/Flow|<a
name="collecttype"></a>@ref[collectType](Source-or-Flow/collectType.md)|Transform
this stream by testing the type of each of the elements on which the element
is an instance of the provided type as they pass through this processing step.|
|Flow|<a
name="completionstageflow"></a>@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams
the elements through the given future flow once it successfully completes.|
+|Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform
this Flow by applying a function to each *incoming* upstream element before it
is passed to the Flow.|
|Source/Flow|<a
name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand
from downstream demand without detaching the stream rates.|
|Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n`
elements and then pass any subsequent element downstream.|
|Source/Flow|<a
name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements
as long as a predicate function return true for the element|
@@ -419,6 +420,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [concatLazy](Source-or-Flow/concatLazy.md)
* [conflate](Source-or-Flow/conflate.md)
* [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
+* [contramap](Flow/contramap.md)
* [cycle](Source/cycle.md)
* [deflate](Compression/deflate.md)
* [delay](Source-or-Flow/delay.md)
diff --git a/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala
b/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala
new file mode 100644
index 0000000000..5589599ebc
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators.flow
+
+//#imports
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.scaladsl._
+
+//#imports
+
+object ContraMap {
+
+ // #contramap
+ val flow: Flow[Int, Int, NotUsed] = Flow[Int]
+ val newFlow: Flow[String, Int, NotUsed] = flow.contramap(_.toInt)
+ // #contramap
+}
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index cf8c68dc63..5df754d409 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -120,6 +120,20 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("de");
}
+ @Test
+ public void mustBeAbleToUseContraMap() {
+ final Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2",
"3"));
+ final Flow<Integer, String, NotUsed> flow =
Flow.fromFunction(String::valueOf);
+ source
+ .via(flow.contramap(Integer::valueOf))
+ .runWith(TestSink.create(system), system)
+ .request(3)
+ .expectNext("1")
+ .expectNext("2")
+ .expectNext("3")
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToUseDropWhile() throws Exception {
final TestKit probe = new TestKit(system);
@@ -881,7 +895,7 @@ public class FlowTest extends StreamTest {
Source.from(input)
.via(Flow.of(FlowSpec.Fruit.class).collectType(FlowSpec.Apple.class))
.runForeach((apple) -> probe.getRef().tell(apple,
ActorRef.noSender()), system);
- probe.expectMsgAnyClassOf(FlowSpec.Apple.class);
+ probe.<Apple>expectMsgAnyClassOf(FlowSpec.Apple.class);
}
@Test
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
index 041b874190..c1d71cb70f 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
@@ -207,6 +207,14 @@ class FlowSpec extends
StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
c1.expectComplete()
}
+ "perform contramap operation" in {
+ val flow = Flow[Int].contramap(Integer.parseInt)
+ val sub = Source(List("1", "2", "3")).via(flow).runWith(TestSink())
+ sub.request(3)
+ sub.expectNextN(List(1, 2, 3))
+ sub.expectComplete()
+ }
+
"perform transformation operation" in {
val flow = Flow[Int].map(i => { testActor ! i.toString; i.toString })
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1a3ed4683e..54b75b94e2 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -532,6 +532,17 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In,
Out, Mat]) extends Gr
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine:
function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
new Sink(delegate.toMat(sink)(combinerToScala(combine)))
+ /**
+ * Transform this Flow by applying a function to each *incoming* upstream
element before
+ * it is passed to the [[Flow]]
+ *
+ * '''Backpressures when''' original [[Flow]] backpressures
+ *
+ * '''Cancels when''' original [[Flow]] cancels
+ */
+ def contramap[In2](f: function.Function[In2, In]): javadsl.Flow[In2, Out,
Mat] =
+ new Flow(delegate.contramap(elem => f(elem)))
+
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs
and outputs, creating a [[RunnableGraph]].
* {{{
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index e3767bfba0..c3e2fcfe56 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -173,6 +173,17 @@ final class Flow[-In, +Out, +Mat](
(mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub),
Source.fromPublisher(pub)))
}
+ /**
+ * Transform this Flow by applying a function to each *incoming* upstream
element before
+ * it is passed to the [[Flow]]
+ *
+ * '''Backpressures when''' original [[Flow]] backpressures
+ *
+ * '''Cancels when''' original [[Flow]] cancels
+ */
+ def contramap[In2](f: In2 => In): Flow[In2, Out, Mat] =
+ Flow.fromFunction(f).viaMat(this)(Keep.right)
+
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs
and outputs, creating a [[RunnableGraph]].
* {{{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]