I’m trying to write avro record that I read from a topic into another
topic, intentions it to augment it with transformation after I get this
routing working. I have used the KStream with avro code from one of the
example with some modifications to connect to Schema Registry for
retrieving the avro schema.



streamsConfiguration.put(StreamsConfig.*APPLICATION_ID_CONFIG*,
*"mysql-stream-processing"*);
streamsConfiguration.put(StreamsConfig.*BOOTSTRAP_SERVERS_CONFIG*,
bootstrapServers);
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.
*SCHEMA_REGISTRY_URL_CONFIG*, schemaRegistryUrl);
final Serde<GenericRecord> keySerde = new GenericAvroSerde(
        new CachedSchemaRegistryClient(schemaRegistryUrl, 100),
        Collections.*singletonMap*(AbstractKafkaAvroSerDeConfig.
*SCHEMA_REGISTRY_URL_CONFIG*,
                schemaRegistryUrl));
final Serde<GenericRecord> valueSerde = new GenericAvroSerde(
        new CachedSchemaRegistryClient(schemaRegistryUrl, 100),
        Collections.*singletonMap*(AbstractKafkaAvroSerDeConfig.
*SCHEMA_REGISTRY_URL_CONFIG*,
                schemaRegistryUrl));
streamsConfiguration.put(ConsumerConfig.*AUTO_OFFSET_RESET_CONFIG*,
*"earliest"*);
streamsConfiguration.put(StreamsConfig.*COMMIT_INTERVAL_MS_CONFIG*, 10 *
1000);

final KStreamBuilder builder = new KStreamBuilder();

final KStream<GenericRecord, GenericRecord> record = builder.stream(
*"dbserver1.employees.employees"*);

record.print(keySerde, valueSerde);

record.to(keySerde, valueSerde, *"newtopic"*);


record.foreach((key, val) -> System.*out*.println(key.toString()+*"  "*
+val.toString()));
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration)
;

streams.cleanUp();
streams.start();



When run print() works as I can see the record in the console, but Im
unable to get the record written to the “newtopic”, failing with the below
error



*Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000,
topic=dbserver1.employees.employees, partition=0, offset=0*

        at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:217)

        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)

        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

*Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer
(key: io.confluent.examples.streams.utils.GenericAvroSerializer / value:
io.confluent.examples.streams.utils.GenericAvroSerializer) is not
compatible to the actual key or value type (key type: [B / value type: [B).
Change the default Serdes in StreamConfig or provide correct Serdes via
method parameters.*

        at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81)

        at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)

        at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)

        at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:198)

        ... 2 more

*Caused by: java.lang.ClassCastException: [B cannot be cast to
org.apache.avro.generic.GenericRecord*

        at
io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25)

        at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:77)

        at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)

        ... 5 more



Regards,

Somasundaram S

-- 
*Disclaimer*: This e-mail is intended to be delivered only to the named 
addressee(s). If this information is received by anyone other than the 
named addressee(s), the recipient(s) should immediately notify 
i...@tigeranalytics.com and promptly delete the transmitted material from 
your computer and server.   In no event shall this material be read, used, 
stored, or retained by anyone other than the named addressee(s) without the 
express written consent of the sender or the named addressee(s). Computer 
viruses can be transmitted viaemail. The recipient should check this email and 
any attachments for viruses. The company accepts no liability for any 
damage caused by any virus transmitted by this email.

Reply via email to