This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new f548ea55ad Add wiretap/wiretapContext to 
FlowWithContext/SourceWithContext
f548ea55ad is described below

commit f548ea55adace97f85d8d81c3980db5c4fc91c88
Author: Matthew de Detrich <[email protected]>
AuthorDate: Fri Aug 23 16:13:29 2024 +0200

    Add wiretap/wiretapContext to FlowWithContext/SourceWithContext
---
 .../src/main/paradox/release-notes/releases-1.1.md |  1 +
 .../stream/scaladsl/FlowWithContextSpec.scala      | 44 ++++++++++++++++++++++
 .../stream/scaladsl/SourceWithContextSpec.scala    | 38 +++++++++++++++++++
 ...ext-source-flow-with-context.backwards.excludes |  2 +
 .../pekko/stream/scaladsl/FlowWithContext.scala    |  6 +++
 .../pekko/stream/scaladsl/FlowWithContextOps.scala | 16 ++++++++
 .../pekko/stream/scaladsl/SourceWithContext.scala  |  6 +++
 7 files changed, 113 insertions(+)

diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md 
b/docs/src/main/paradox/release-notes/releases-1.1.md
index 5fdee4669f..22ca8ac646 100644
--- a/docs/src/main/paradox/release-notes/releases-1.1.md
+++ b/docs/src/main/paradox/release-notes/releases-1.1.md
@@ -43,6 +43,7 @@ The Stream API has been updated to add some extra functions.
 * added extra retry operators that allow users to provide a predicate to 
decide whether to retry based on the exception 
([PR1269](https://github.com/apache/pekko/pull/1269))
 * add optionalVia/unsafeOptionalDataVia operators 
([PR1422](https://github.com/apache/pekko/pull/1422))
 * add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` 
([PR-1443](https://github.com/apache/pekko/pull/1443))
+* add wireTap/wireTapContext operators to 
`SourceWithContext`/`FlowWithContext` 
([PR-1446](https://github.com/apache/pekko/pull/1446))
 
 The Stream Testkit Java DSL has some extra functions.
 
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
index ed3d61ab6f..9acee47c76 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
@@ -126,6 +126,50 @@ class FlowWithContextSpec extends StreamSpec {
         }
     }
 
+    "pass through all data when using wireTap" in {
+      val listBuffer = new ListBuffer[String]()
+      Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), 
Message("C", 4L)))
+        .asSourceWithContext(_.offset)
+        .via(
+          FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), 
(String, Long)] { case (data, offset) =>
+            (data.data.toLowerCase, offset)
+          }).wireTap(Sink.foreach(string => listBuffer.+=(string)))
+        )
+        .toMat(TestSink.probe[(String, Long)])(Keep.right)
+        .run()
+        .request(4)
+        .expectNext(("a", 1L))
+        .expectNext(("b", 2L))
+        .expectNext(("d", 3L))
+        .expectNext(("c", 4L))
+        .expectComplete()
+        .within(10.seconds) {
+          listBuffer should contain atLeastOneElementOf List("a", "b", "d", 
"c")
+        }
+    }
+
+    "pass through all data when using wireTapContext" in {
+      val listBuffer = new ListBuffer[Long]()
+      Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), 
Message("C", 4L)))
+        .asSourceWithContext(_.offset)
+        .via(
+          FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), 
(String, Long)] { case (data, offset) =>
+            (data.data.toLowerCase, offset)
+          }).wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
+        )
+        .toMat(TestSink.probe[(String, Long)])(Keep.right)
+        .run()
+        .request(4)
+        .expectNext(("a", 1L))
+        .expectNext(("b", 2L))
+        .expectNext(("d", 3L))
+        .expectNext(("c", 4L))
+        .expectComplete()
+        .within(10.seconds) {
+          listBuffer should contain atLeastOneElementOf List(1L, 2L, 3L, 4L)
+        }
+    }
+
     "keep the same order for data and context when using unsafeDataVia" in {
       val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4))
 
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
index 0c832f2beb..06650a7b77 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
@@ -115,6 +115,44 @@ class SourceWithContextSpec extends StreamSpec {
         }
     }
 
+    "pass through all data when using wireTap" in {
+      val listBuffer = new ListBuffer[Message]()
+      val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 
3L), Message("C", 4L))
+      Source(messages)
+        .asSourceWithContext(_.offset)
+        .wireTap(Sink.foreach(message => listBuffer.+=(message)))
+        .toMat(TestSink.probe[(Message, Long)])(Keep.right)
+        .run()
+        .request(4)
+        .expectNext((Message("A", 1L), 1L))
+        .expectNext((Message("B", 2L), 2L))
+        .expectNext((Message("D", 3L), 3L))
+        .expectNext((Message("C", 4L), 4L))
+        .expectComplete()
+        .within(10.seconds) {
+          listBuffer.toVector should contain atLeastOneElementOf messages
+        }
+    }
+
+    "pass through all data when using wireTapContext" in {
+      val listBuffer = new ListBuffer[Long]()
+      val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 
3L), Message("C", 4L))
+      Source(messages)
+        .asSourceWithContext(_.offset)
+        .wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
+        .toMat(TestSink.probe[(Message, Long)])(Keep.right)
+        .run()
+        .request(4)
+        .expectNext((Message("A", 1L), 1L))
+        .expectNext((Message("B", 2L), 2L))
+        .expectNext((Message("D", 3L), 3L))
+        .expectNext((Message("C", 4L), 4L))
+        .expectComplete()
+        .within(10.seconds) {
+          listBuffer.toVector should contain atLeastOneElementOf 
(messages.map(_.offset))
+        }
+    }
+
     "pass through contexts via a FlowWithContext" in {
 
       def flowWithContext[T] = FlowWithContext[T, Long]
diff --git 
a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes
 
b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes
new file mode 100644
index 0000000000..0eb85a68d4
--- /dev/null
+++ 
b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1446-wireTap-wireTapContext-source-flow-with-context.backwards.excludes
@@ -0,0 +1,2 @@
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTap")
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTapContext")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
index 13df874395..87315e91ec 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
@@ -144,6 +144,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, 
+Mat](delegate: Flow[(In
   override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, 
CtxOut] =
     FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: 
(Out, CtxOut)) => in._2)))
 
+  override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] =
+    FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: 
(Out, CtxOut)) => in._1)))
+
+  override def wireTapContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, 
CtxOut] =
+    FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: 
(Out, CtxOut)) => in._2)))
+
   /**
    * Context-preserving variant of 
[[pekko.stream.scaladsl.Flow.withAttributes]].
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
index fe06cf0142..88848d2fca 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala
@@ -104,6 +104,22 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
    */
   def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]
 
+  /**
+   * Data variant of [[pekko.stream.scaladsl.FlowOps.wireTap]]
+   *
+   * @see [[pekko.stream.scaladsl.FlowOps.wireTap]]
+   * @since 1.1.0
+   */
+  def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx]
+
+  /**
+   * Context variant of [[pekko.stream.scaladsl.FlowOps.wireTap]]
+   *
+   * @see [[pekko.stream.scaladsl.FlowOps.wireTap]]
+   * @since 1.1.0
+   */
+  def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]
+
   /**
    * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]].
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
index be9c09f7e1..f98c445bf9 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala
@@ -161,6 +161,12 @@ final class SourceWithContext[+Out, +Ctx, +Mat] 
private[stream] (delegate: Sourc
   override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
     SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: 
(Out, Ctx)) => in._2)))
 
+  override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] =
+    SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, 
(in: (Out, Ctx)) => in._1)))
+
+  override def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
+    SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, 
(in: (Out, Ctx)) => in._2)))
+
   /**
    * Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a 
[[pekko.stream.scaladsl.Sink]] and run it.
    * The returned value is the materialized value of the `Sink`.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to