[ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430231#comment-16430231
 ] 

Valentino Proietti commented on KAFKA-6742:
-------------------------------------------

Thank you [~mjsax] !

> TopologyTestDriver error when dealing with stores from GlobalKTable
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6742
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6742
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Valentino Proietti
>            Assignee: Valentino Proietti
>            Priority: Minor
>
> {color:#ff0000}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable<String,String> localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable<String,String> globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore<String,String> localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore<String,String> globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory<String,String> crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff0000}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map<String, StateStore> getAllStateStores() {
> //        final Map<String, StateStore> allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff0000}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map<String, StateStore> allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff0000}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff0000}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff0000}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.advanceWallClockTime(0);
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("Second", localStore.get("two"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
> Assert._assertEquals_("Second", globalStore.get("two"));
> }
>  
> {color:#ff0000}that could be fixed with:{color}
>  
>     *private* *void* captureOutputRecords() {
>         // Capture all the records sent to the producer ...
>         *final* List<ProducerRecord<*byte*[], *byte*[]>> output = 
> producer.history();
>         producer.clear();
>         *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {
>             Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = 
> outputRecordsByTopic.get(record.topic());
>             *if* (outputRecords == *null*)
> {                 outputRecords = *new* LinkedList<>();                 
> outputRecordsByTopic.put(record.topic(), outputRecords);             }
>             outputRecords.add(record);
>  
>             // Forward back into the topology if the produced record is to an 
> internal or a source topic ...
>             *final* String outputTopicName = record.topic();
>             *if* (internalTopics.contains(outputTopicName) || 
> processorTopology.sourceTopics().contains(outputTopicName)
>             || globalPartitionsByTopic.containsKey(outputTopicName)) {  
> {color:#ff0000}// *FIXME*{color}
>                 *final* *byte*[] serializedKey = record.key();
>                 *final* *byte*[] serializedValue = record.value();
>  
>                 pipeInput(*new* ConsumerRecord<>(
>                     outputTopicName,
>                     -1,
>                     -1L,
>                     record.timestamp(),
>                     TimestampType.*_CREATE_TIME_*,
>                     0L,
>                     serializedKey == *null* ? 0 : serializedKey.length,
>                     serializedValue == *null* ? 0 : serializedValue.length,
>                     serializedKey,
>                     serializedValue));
>             }
>         }
>     }
>  
>  
>  
> *Thank you*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to