This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new f548ea55ad Add wiretap/wiretapContext to
FlowWithContext/SourceWithContext
f548ea55ad is described below
commit f548ea55adace97f85d8d81c3980db5c4fc91c88
Author: Matthew de Detrich <[email protected]>
AuthorDate: Fri Aug 23 16:13:29 2024 +0200
Add wiretap/wiretapContext to FlowWithContext/SourceWithContext
---
.../src/main/paradox/release-notes/releases-1.1.md | 1 +
.../stream/scaladsl/FlowWithContextSpec.scala | 44 ++++++++++++++++++++++
.../stream/scaladsl/SourceWithContextSpec.scala | 38 +++++++++++++++++++
...ext-source-flow-with-context.backwards.excludes | 2 +
.../pekko/stream/scaladsl/FlowWithContext.scala | 6 +++
.../pekko/stream/scaladsl/FlowWithContextOps.scala | 16 ++++++++
.../pekko/stream/scaladsl/SourceWithContext.scala | 6 +++
7 files changed, 113 insertions(+)
diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md
b/docs/src/main/paradox/release-notes/releases-1.1.md
index 5fdee4669f..22ca8ac646 100644
--- a/docs/src/main/paradox/release-notes/releases-1.1.md
+++ b/docs/src/main/paradox/release-notes/releases-1.1.md
@@ -43,6 +43,7 @@ The Stream API has been updated to add some extra functions.
* added extra retry operators that allow users to provide a predicate to
decide whether to retry based on the exception
([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalDataVia operators
([PR1422](https://github.com/apache/pekko/pull/1422))
* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext`
([PR-1443](https://github.com/apache/pekko/pull/1443))
+* add wireTap/wireTapContext operators to
`SourceWithContext`/`FlowWithContext`
([PR-1446](https://github.com/apache/pekko/pull/1446))
The Stream Testkit Java DSL has some extra functions.
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
index ed3d61ab6f..9acee47c76 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
@@ -126,6 +126,50 @@ class FlowWithContextSpec extends StreamSpec {
}
}
+ "pass through all data when using wireTap" in {
+ val listBuffer = new ListBuffer[String]()
+ Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L),
Message("C", 4L)))
+ .asSourceWithContext(_.offset)
+ .via(
+ FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long),
(String, Long)] { case (data, offset) =>
+ (data.data.toLowerCase, offset)
+ }).wireTap(Sink.foreach(string => listBuffer.+=(string)))
+ )
+ .toMat(TestSink.probe[(String, Long)])(Keep.right)
+ .run()
+ .request(4)
+ .expectNext(("a", 1L))
+ .expectNext(("b", 2L))
+ .expectNext(("d", 3L))
+ .expectNext(("c", 4L))
+ .expectComplete()
+ .within(10.seconds) {
+ listBuffer should contain atLeastOneElementOf List("a", "b", "d",
"c")
+ }
+ }
+
+ "pass through all data when using wireTapContext" in {
+ val listBuffer = new ListBuffer[Long]()
+ Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L),
Message("C", 4L)))
+ .asSourceWithContext(_.offset)
+ .via(
+ FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long),
(String, Long)] { case (data, offset) =>
+ (data.data.toLowerCase, offset)
+ }).wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
+ )
+ .toMat(TestSink.probe[(String, Long)])(Keep.right)
+ .run()
+ .request(4)
+ .expectNext(("a", 1L))
+ .expectNext(("b", 2L))
+ .expectNext(("d", 3L))
+ .expectNext(("c", 4L))
+ .expectComplete()
+ .within(10.seconds) {
+ listBuffer should contain atLeastOneElementOf List(1L, 2L, 3L, 4L)
+ }
+ }
+
"keep the same order for data and context when using unsafeDataVia" in {
val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4))
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
index 0c832f2beb..06650a7b77 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
@@ -115,6 +115,44 @@ class SourceWithContextSpec extends StreamSpec {
}
}
+ "pass through all data when using wireTap" in {
+ val listBuffer = new ListBuffer[Message]()
+ val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D",
3L), Message("C", 4L))
+ Source(messages)
+ .asSourceWithContext(_.offset)
+ .wireTap(Sink.foreach(message => listBuffer.+=(message)))
+ .toMat(TestSink.probe[(Message, Long)])(Keep.right)
+ .run()
+ .request(4)
+ .expectNext((Message("A", 1L), 1L))
+ .expectNext((Message("B", 2L), 2L))
+ .expectNext((Message("D", 3L), 3L))
+ .expectNext((Message("C", 4L), 4L))
+ .expectComplete()
+ .within(10.seconds) {
+ listBuffer.toVector should contain atLeastOneElementOf messages
+ }
+ }
+
+ "pass through all data when using wireTapContext" in {
+ val listBuffer = new ListBuffer[Long]()
+ val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D",
3L), Message("C", 4L))
+ Source(messages)
+ .asSourceWithContext(_.offset)
+ .wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
+ .toMat(TestSink.probe[(Message, Long)])(Keep.right)
+ .run()
+ .request(4)
+ .expectNext((Message("A", 1L), 1L))
+ .expectNext((Message("B", 2L), 2L))
+ .expectNext((Message("D", 3L), 3L))
+ .expectNext((Message("C", 4L), 4L))
+ .expectComplete()
+ .within(10.seconds) {
+ listBuffer.toVector should contain atLeastOneElementOf
(messages.map(_.offset))
+ }
+ }
+
"pass through contexts via a FlowWithContext" in {
def flowWithContext[T] = FlowWithContext[T, Long]
diff --git
a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes
b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes
new file mode 100644
index 0000000000..0eb85a68d4
--- /dev/null
+++
b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes
@@ -0,0 +1,2 @@
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTap")
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTapContext")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
index 13df874395..87315e91ec 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
@@ -144,6 +144,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut,
+Mat](delegate: Flow[(In
override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out,
CtxOut] =
FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in:
(Out, CtxOut)) => in._2)))
+ override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] =
+ FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in:
(Out, CtxOut)) => in._1)))
+
+ override def wireTapContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out,
CtxOut] =
+ FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in:
(Out, CtxOut)) => in._2)))
+
/**
* Context-preserving variant of
[[pekko.stream.scaladsl.Flow.withAttributes]].
*
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
index fe06cf0142..88848d2fca 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
@@ -104,6 +104,22 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
*/
def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]
+ /**
+ * Data variant of [[pekko.stream.scaladsl.FlowOps.wireTap]]
+ *
+ * @see [[pekko.stream.scaladsl.FlowOps.wireTap]]
+ * @since 1.1.0
+ */
+ def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx]
+
+ /**
+ * Context variant of [[pekko.stream.scaladsl.FlowOps.wireTap]]
+ *
+ * @see [[pekko.stream.scaladsl.FlowOps.wireTap]]
+ * @since 1.1.0
+ */
+ def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]
+
/**
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]].
*
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
index be9c09f7e1..f98c445bf9 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
@@ -161,6 +161,12 @@ final class SourceWithContext[+Out, +Ctx, +Mat]
private[stream] (delegate: Sourc
override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in:
(Out, Ctx)) => in._2)))
+ override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] =
+ SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that,
(in: (Out, Ctx)) => in._1)))
+
+ override def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
+ SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that,
(in: (Out, Ctx)) => in._2)))
+
/**
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a
[[pekko.stream.scaladsl.Sink]] and run it.
* The returned value is the materialized value of the `Sink`.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]