[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5253:
-----------------------------------
    Description: 
*Context*
 -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
unit test topologies while developing KStreams apps.

One such topology uses a Pattern to consume from multiple topics at once.

*Problem*
 The unit test of the topology fails because -KStreamTestDriver- 
TopologyTestDriver fails to deal with Patterns properly.

*Example*
 Underneath is a unit test explaining what I understand should happen, but is 
failing.

**Note: the example below is outdate as it used the old KStreamTestDriver. The 
overall test layout can be adopted for TopologyTestDriver though, thus, we just 
leave it in the description.**

Explicitly adding a source topic matching the topic pattern, generates an 
exception as the topology builder explicitly checks overlapping topic names and 
patterns, in any order of adding pattern and topic. So, it is intended 
behaviour.
{code:java}
    @Test
    public void shouldProcessFromSourcesThatDoMatchThePattern() {
        // -- setup stream pattern
        final KStream<String, String> source = 
builder.stream(Pattern.compile("topic-source-\\d"));
        source.to("topic-sink");

        // -- setup processor to capture results
        final MockProcessorSupplier<String, String> processorSupplier = new 
MockProcessorSupplier<>();
        source.process(processorSupplier);

        // -- add source to stream data from
        //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
"topic-source-3");

        // -- build test driver
        driver = new KStreamTestDriver(builder); // this should be 
TopologyTestDriver
        driver.setTime(0L);

        // -- test
        driver.process("topic-source-3", "A", "aa");

        // -- validate
        // no exception was thrown
        assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
    }
{code}
*Solution*
 If anybody can help in defining the solution, I can create a pull request for 
this change.-

  was:
*Context*
 -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
unit test topologies while developing KStreams apps.

One such topology uses a Pattern to consume from multiple topics at once.

*Problem*
 The unit test of the topology fails because -KStreamTestDriver- 
TopologyTestDriver fails to deal with Patterns properly.

*Example*
 Underneath is a unit test explaining what I understand should happen, but is 
failing.

Explicitly adding a source topic matching the topic pattern, generates an 
exception as the topology builder explicitly checks overlapping topic names and 
patterns, in any order of adding pattern and topic. So, it is intended 
behaviour.
{code:java}
    @Test
    public void shouldProcessFromSourcesThatDoMatchThePattern() {
        // -- setup stream pattern
        final KStream<String, String> source = 
builder.stream(Pattern.compile("topic-source-\\d"));
        source.to("topic-sink");

        // -- setup processor to capture results
        final MockProcessorSupplier<String, String> processorSupplier = new 
MockProcessorSupplier<>();
        source.process(processorSupplier);

        // -- add source to stream data from
        //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
"topic-source-3");

        // -- build test driver
        driver = new KStreamTestDriver(builder);
        driver.setTime(0L);

        // -- test
        driver.process("topic-source-3", "A", "aa");

        // -- validate
        // no exception was thrown
        assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
    }
{code}
*Solution*
 If anybody can help in defining the solution, I can create a pull request for 
this change.


> TopologyTestDriver must handle streams created with patterns
> ------------------------------------------------------------
>
>                 Key: KAFKA-5253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5253
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>    Affects Versions: 1.1.0
>            Reporter: Wim Van Leuven
>            Assignee: Jagadesh Adireddi
>            Priority: Major
>              Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> **Note: the example below is outdate as it used the old KStreamTestDriver. 
> The overall test layout can be adopted for TopologyTestDriver though, thus, 
> we just leave it in the description.**
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
>     @Test
>     public void shouldProcessFromSourcesThatDoMatchThePattern() {
>         // -- setup stream pattern
>         final KStream<String, String> source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
>         source.to("topic-sink");
>         // -- setup processor to capture results
>         final MockProcessorSupplier<String, String> processorSupplier = new 
> MockProcessorSupplier<>();
>         source.process(processorSupplier);
>         // -- add source to stream data from
>         //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
>         // -- build test driver
>         driver = new KStreamTestDriver(builder); // this should be 
> TopologyTestDriver
>         driver.setTime(0L);
>         // -- test
>         driver.process("topic-source-3", "A", "aa");
>         // -- validate
>         // no exception was thrown
>         assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
>     }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.-



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to