I would do something like this:
```
  "peek a KStream" should "side effect records" in {
    val builder = new StreamsBuilder()
    val sourceTopic = "source"
    val sinkTopic = "sink"

    var acc = ""
    builder.stream[String, String](sourceTopic).peek((_, v) => acc += 
v).to(sinkTopic)

    val testDriver = createTestDriver(builder)

    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
    acc shouldBe "value1"
    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
    
    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
    acc shouldBe "value1value2"
    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value2"

    testDriver.close()
  }
```

[ Full content available at: https://github.com/apache/kafka/pull/5566 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to