This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34d9ae6  MINOR: Fix streams Scala peek recursive call (#5566)
34d9ae6 is described below

commit 34d9ae66281602dad3a70ecd07ca0f2b6237ae8c
Author: tedyu <[email protected]>
AuthorDate: Wed Aug 29 09:34:01 2018 -0700

    MINOR: Fix streams Scala peek recursive call (#5566)
    
    This PR fixes the previously recursive call of Streams Scala peek
    
    Reviewers: Joan Goyeau <[email protected]>, Guozhang Wang 
<[email protected]>, John Roesler <[email protected]>
---
 .../kafka/streams/scala/kstream/KStream.scala      |  2 +-
 .../apache/kafka/streams/scala/KStreamTest.scala   | 25 +++++++++++++++++++++-
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 2bcbf04..8e4c9aa 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -573,5 +573,5 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#peek`
    */
   def peek(action: (K, V) => Unit): KStream[K, V] =
-    inner.peek((k: K, v: V) => action(k, v))
+    inner.peek(action.asForeachAction)
 }
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
index e70d900..3fdfee6 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -75,7 +75,7 @@ class KStreamTest extends FlatSpec with Matchers with 
TestDriver {
     testDriver.close()
   }
 
-  "foreach a KStream" should "side effect records" in {
+  "foreach a KStream" should "run foreach actions on records" in {
     val builder = new StreamsBuilder()
     val sourceTopic = "source"
 
@@ -85,8 +85,31 @@ class KStreamTest extends FlatSpec with Matchers with 
TestDriver {
     val testDriver = createTestDriver(builder)
 
     testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    acc shouldBe "value1"
+
+    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+    acc shouldBe "value1value2"
+
+    testDriver.close()
+  }
+
+  "peek a KStream" should "run peek actions on records" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    var acc = ""
+    builder.stream[String, String](sourceTopic).peek((k, v) => acc += 
v).to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    acc shouldBe "value1"
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
     testDriver.pipeRecord(sourceTopic, ("2", "value2"))
     acc shouldBe "value1value2"
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value2"
 
     testDriver.close()
   }

Reply via email to