SHWETA SINHA created KAFKA-12542:
------------------------------------

             Summary: TopologyTestDriver
                 Key: KAFKA-12542
                 URL: https://issues.apache.org/jira/browse/KAFKA-12542
             Project: Kafka
          Issue Type: Bug
          Components: streams-test-utils
    Affects Versions: 2.6.0
            Reporter: SHWETA SINHA


I am using Kafka Streams DSL to create topology.

StreamsBuilder streamsBuilder=new StreamsBuilder();
valueSerde.configure(getConfigForSpecificAvro(appId),false);
KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic, 
Consumed.with(new Serdes.StringSerde(), valueSerde));
KStream<String, AvroDTO> filtered = stream .filter((key, value) -> 
ServiceConsumer.filter(key,value));
filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String, 
SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value))
 .to((k,v,recordContext) -> v instanceof AvroDTO? 
dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde));
Topology topology=streamsBuilder.build();
KafkaStreams kafkaStreams=new 
KafkaStreams(topology,getKafkaStreamsConfig(appId));

 

To Test the Topology, I am using TopologyTestDriver.  

when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig);
 when(ServiceConsumer.filter(any(),any())).thenReturn(false);
// when(ServiceConsumer.process(any(),any())).thenReturn(new 
KeyValue<>(statusDto.getTaskId().toString(),statusDto));
 topologyTestDriver=new 
TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig);
StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new 
StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient));
UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new 
StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config));

StreamInput.pipeInput("Hi);
assertThat(UpdateOutput .isEmpty()).isTrue();

 

I am checking if there are no filtered messages then my output topic is empty.

Getting Error while Unit Testing

java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC

 

Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true 
doesnt gives any error.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to