[ 
https://issues.apache.org/jira/browse/KAFKA-4828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15893141#comment-15893141
 ] 

ASF GitHub Bot commented on KAFKA-4828:
---------------------------------------

GitHub user hrafzali opened a pull request:

    https://github.com/apache/kafka/pull/2629

    KAFKA-4828: ProcessorTopologyTestDriver does not work when using through

    This resolves the following issues in the ProcessorTopologyTestDriver:
    
    - It should not create an internal changelog topic when using `through()`
    - It should forward the produced record back into the topology if it is to 
a source topic
    
    Jira ticket: https://issues.apache.org/jira/browse/KAFKA-4828
    
    The contribution is my original work and I license the work to the project 
under the project's open source license.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hrafzali/kafka 
KAFKA-4828_ProcessorTopologyTestDriver_through

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2629.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2629
    
----
commit 93b6e2f185866fd4ae50624085ded17f9cb4cac2
Author: Hamidreza Afzali <hrafz...@gmail.com>
Date:   2017-03-01T19:01:47Z

    KAFKA-4828: ProcessorTopologyTestDriver gets names of stores and changelog 
topics from the topology

commit 884c8d40ef1c3a235fd89f863fd84392d9abe8ae
Author: Hamidreza Afzali <hrafz...@gmail.com>
Date:   2017-03-02T21:50:58Z

    KAFKA-4828: Added support to ProcessorTopologyTestDriver for processing 
records produced to source topics

----


> ProcessorTopologyTestDriver does not work when using .through()
> ---------------------------------------------------------------
>
>                 Key: KAFKA-4828
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4828
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Hamidreza Afzali
>            Assignee: Hamidreza Afzali
>              Labels: unit-test
>
> *Problem:*
> ProcessorTopologyTestDriver does not work when testing a topology that uses 
> through().
> {code}
> org.apache.kafka.streams.errors.StreamsException: Store count2's change log 
> (count2-topic) does not contain partition 1
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
> {code}
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
>     val inputTopic = "input"
>     val stateStore = "count"
>     val stateStore2 = "count2"
>     val outputTopic2 = "count2-topic"
>     val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))
>     val props = new Properties
>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
>     val builder = new KStreamBuilder
>     builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>       .groupByKey(Serdes.String, Serdes.Integer)
>       .count(stateStore)
>       .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)
>     val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore, stateStore2)
>     inputs.foreach {
>       case (key, value) => {
>         driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
>         val record = driver.readOutput(outputTopic2, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
>         println(record)
>       }
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to