[ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425309#comment-16425309
 ] 

ASF GitHub Bot commented on KAFKA-6742:
---------------------------------------

Vale68 opened a new pull request #4823: KAFKA-6742: TopologyTestDriver error 
when dealing with stores from GlobalKTable
URL: https://github.com/apache/kafka/pull/4823
 
 
   @guozhangwang
    
   While TopologyTestDriver works well with stores created from KTable it does 
not with stores from GlobalKTable.
   Moreover, for my testing purposes but I think it can be useful to others, I 
need to get access to the MockProducer inside TopologyTestDriver.
   
   I have added 4 new tests to TopologyTestDriverTest, two for stores from 
KTable and two for stores from GlobalKTable.
   
   While I was changing the TopologyTestDriver I've also make it implement 
java.io.Closeable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver error when dealing with stores from GlobalKTable
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6742
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6742
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Valentino Proietti
>            Priority: Minor
>
> {color:#ff0000}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable<String,String> localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable<String,String> globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore<String,String> localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore<String,String> globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory<String,String> crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff0000}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map<String, StateStore> getAllStateStores() {
> //        final Map<String, StateStore> allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff0000}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map<String, StateStore> allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff0000}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff0000}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff0000}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.advanceWallClockTime(0);
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("Second", localStore.get("two"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
> Assert._assertEquals_("Second", globalStore.get("two"));
> }
>  
> {color:#ff0000}that could be fixed with:{color}
>  
>     *private* *void* captureOutputRecords() {
>         // Capture all the records sent to the producer ...
>         *final* List<ProducerRecord<*byte*[], *byte*[]>> output = 
> producer.history();
>         producer.clear();
>         *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {
>             Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = 
> outputRecordsByTopic.get(record.topic());
>             *if* (outputRecords == *null*)
> {                 outputRecords = *new* LinkedList<>();                 
> outputRecordsByTopic.put(record.topic(), outputRecords);             }
>             outputRecords.add(record);
>  
>             // Forward back into the topology if the produced record is to an 
> internal or a source topic ...
>             *final* String outputTopicName = record.topic();
>             *if* (internalTopics.contains(outputTopicName) || 
> processorTopology.sourceTopics().contains(outputTopicName)
>             || globalPartitionsByTopic.containsKey(outputTopicName)) {  
> {color:#ff0000}// *FIXME*{color}
>                 *final* *byte*[] serializedKey = record.key();
>                 *final* *byte*[] serializedValue = record.value();
>  
>                 pipeInput(*new* ConsumerRecord<>(
>                     outputTopicName,
>                     -1,
>                     -1L,
>                     record.timestamp(),
>                     TimestampType.*_CREATE_TIME_*,
>                     0L,
>                     serializedKey == *null* ? 0 : serializedKey.length,
>                     serializedValue == *null* ? 0 : serializedValue.length,
>                     serializedKey,
>                     serializedValue));
>             }
>         }
>     }
>  
>  
>  
> *Thank you*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to