I would do something like this:
```
"peek a KStream" should "side effect records" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
var acc = ""
builder.stream[String, String](sourceTopic).peek((_, v) => acc +=
v).to(sinkTopic)
val testDriver = createTestDriver(builder)
testDriver.pipeRecord(sourceTopic, ("1", "value1"))
acc shouldBe "value1"
testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
testDriver.pipeRecord(sourceTopic, ("2", "value2"))
acc shouldBe "value1value2"
testDriver.readRecord[String, String](sinkTopic).value shouldBe "value2"
testDriver.close()
}
```
[ Full content available at: https://github.com/apache/kafka/pull/5566 ]
This message was relayed via gitbox.apache.org for [email protected]