Hamidreza Afzali created KAFKA-4789:
---------------------------------------

             Summary: ProcessorTopologyTestDriver does not forward extracted 
timestamps to internal topics
                 Key: KAFKA-4789
                 URL: https://issues.apache.org/jira/browse/KAFKA-4789
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.0
            Reporter: Hamidreza Afzali


*Problem:*

When using ProcessorTopologyTestDriver, the extracted timestamp is not 
forwarded with the produced record to the internal topics.

*Example:*

{code}
object Topology1 {

  def main(args: Array[String]): Unit = {

    val inputTopic = "input"
    val outputTopic = "output"
    val stateStore = "count"
    val inputs = Seq[(String, Integer)](("A@1450000000", 1), ("B@1450000000", 
2))

    val props = new Properties
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
classOf[MyTimestampExtractor].getName)

    val windowedStringSerde = Serdes.serdeFrom(new 
WindowedSerializer(Serdes.String.serializer),
      new WindowedDeserializer(Serdes.String.deserializer))

    val builder = new KStreamBuilder
    builder.stream(Serdes.String, Serdes.Integer, inputTopic)
      .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
      .groupByKey(Serdes.String, Serdes.Integer)
      .count(TimeWindows.of(1000L), stateStore)
      .to(windowedStringSerde, Serdes.Long, outputTopic)

    val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
builder, stateStore)
    inputs.foreach {
      case (key, value) => {
        driver.process(inputTopic, key, value, Serdes.String.serializer, 
Serdes.Integer.serializer)
        val record = driver.readOutput(outputTopic, Serdes.String.deserializer, 
Serdes.Long.deserializer)
        println(record)
      }
    }

  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to