[
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-4789.
----------------------------------
Resolution: Fixed
Fix Version/s: 0.10.3.0
Issue resolved by pull request 2590
[https://github.com/apache/kafka/pull/2590]
> ProcessorTopologyTestDriver does not forward extracted timestamps to internal
> topics
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0
> Reporter: Hamidreza Afzali
> Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
> def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@1450000000", 1), ("B@1450000000",
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new
> WindowedSerializer(Serdes.String.serializer),
> new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
> .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
> .groupByKey(Serdes.String, Serdes.Integer)
> .count(TimeWindows.of(1000L), stateStore)
> .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props),
> builder, stateStore)
> inputs.foreach {
> case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer,
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic,
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)