This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 0bd8138 MINOR: Fix streams Scala peek recursive call (#5566)
0bd8138 is described below
commit 0bd8138f76a1e74116d6542bad88aeb5bdc6f03d
Author: tedyu <[email protected]>
AuthorDate: Wed Aug 29 09:34:01 2018 -0700
MINOR: Fix streams Scala peek recursive call (#5566)
This PR fixes the previously recursive call of Streams Scala peek
Reviewers: Joan Goyeau <[email protected]>, Guozhang Wang
<[email protected]>, John Roesler <[email protected]>
---
.../kafka/streams/scala/kstream/KStream.scala | 2 +-
.../apache/kafka/streams/scala/KStreamTest.scala | 25 +++++++++++++++++++++-
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 2bcbf04..8e4c9aa 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -573,5 +573,5 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#peek`
*/
def peek(action: (K, V) => Unit): KStream[K, V] =
- inner.peek((k: K, v: V) => action(k, v))
+ inner.peek(action.asForeachAction)
}
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
index e70d900..3fdfee6 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -75,7 +75,7 @@ class KStreamTest extends FlatSpec with Matchers with
TestDriver {
testDriver.close()
}
- "foreach a KStream" should "side effect records" in {
+ "foreach a KStream" should "run foreach actions on records" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"
@@ -85,8 +85,31 @@ class KStreamTest extends FlatSpec with Matchers with
TestDriver {
val testDriver = createTestDriver(builder)
testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+ acc shouldBe "value1"
+
+ testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+ acc shouldBe "value1value2"
+
+ testDriver.close()
+ }
+
+ "peek a KStream" should "run peek actions on records" in {
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ var acc = ""
+ builder.stream[String, String](sourceTopic).peek((k, v) => acc +=
v).to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+ acc shouldBe "value1"
+ testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
testDriver.pipeRecord(sourceTopic, ("2", "value2"))
acc shouldBe "value1value2"
+ testDriver.readRecord[String, String](sinkTopic).value shouldBe "value2"
testDriver.close()
}