I’m trying to write avro record that I read from a topic into another topic, intentions it to augment it with transformation after I get this routing working. I have used the KStream with avro code from one of the example with some modifications to connect to Schema Registry for retrieving the avro schema.
streamsConfiguration.put(StreamsConfig.*APPLICATION_ID_CONFIG*, *"mysql-stream-processing"*); streamsConfiguration.put(StreamsConfig.*BOOTSTRAP_SERVERS_CONFIG*, bootstrapServers); streamsConfiguration.put(AbstractKafkaAvroSerDeConfig. *SCHEMA_REGISTRY_URL_CONFIG*, schemaRegistryUrl); final Serde<GenericRecord> keySerde = new GenericAvroSerde( new CachedSchemaRegistryClient(schemaRegistryUrl, 100), Collections.*singletonMap*(AbstractKafkaAvroSerDeConfig. *SCHEMA_REGISTRY_URL_CONFIG*, schemaRegistryUrl)); final Serde<GenericRecord> valueSerde = new GenericAvroSerde( new CachedSchemaRegistryClient(schemaRegistryUrl, 100), Collections.*singletonMap*(AbstractKafkaAvroSerDeConfig. *SCHEMA_REGISTRY_URL_CONFIG*, schemaRegistryUrl)); streamsConfiguration.put(ConsumerConfig.*AUTO_OFFSET_RESET_CONFIG*, *"earliest"*); streamsConfiguration.put(StreamsConfig.*COMMIT_INTERVAL_MS_CONFIG*, 10 * 1000); final KStreamBuilder builder = new KStreamBuilder(); final KStream<GenericRecord, GenericRecord> record = builder.stream( *"dbserver1.employees.employees"*); record.print(keySerde, valueSerde); record.to(keySerde, valueSerde, *"newtopic"*); record.foreach((key, val) -> System.*out*.println(key.toString()+*" "* +val.toString())); final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration) ; streams.cleanUp(); streams.start(); When run print() works as I can see the record in the console, but Im unable to get the record written to the “newtopic”, failing with the below error *Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=dbserver1.employees.employees, partition=0, offset=0* at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:217) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) *Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: io.confluent.examples.streams.utils.GenericAvroSerializer / value: io.confluent.examples.streams.utils.GenericAvroSerializer) is not compatible to the actual key or value type (key type: [B / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.* at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:198) ... 2 more *Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.avro.generic.GenericRecord* at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:77) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79) ... 5 more Regards, Somasundaram S -- *Disclaimer*: This e-mail is intended to be delivered only to the named addressee(s). If this information is received by anyone other than the named addressee(s), the recipient(s) should immediately notify i...@tigeranalytics.com and promptly delete the transmitted material from your computer and server. In no event shall this material be read, used, stored, or retained by anyone other than the named addressee(s) without the express written consent of the sender or the named addressee(s). Computer viruses can be transmitted viaemail. The recipient should check this email and any attachments for viruses. The company accepts no liability for any damage caused by any virus transmitted by this email.