Cross posted at SO: https://stackoverflow.com/questions/47712933/kstream-error-reading-and-writing-avro-records
I put an answer there. -Matthias On 12/7/17 10:29 PM, Somasundaram Sekar wrote: > I’m trying to write avro record that I read from a topic into another > topic, intentions it to augment it with transformation after I get this > routing working. I have used the KStream with avro code from one of the > example with some modifications to connect to Schema Registry for > retrieving the avro schema. > > > > streamsConfiguration.put(StreamsConfig.*APPLICATION_ID_CONFIG*, > *"mysql-stream-processing"*); > streamsConfiguration.put(StreamsConfig.*BOOTSTRAP_SERVERS_CONFIG*, > bootstrapServers); > streamsConfiguration.put(AbstractKafkaAvroSerDeConfig. > *SCHEMA_REGISTRY_URL_CONFIG*, schemaRegistryUrl); > final Serde<GenericRecord> keySerde = new GenericAvroSerde( > new CachedSchemaRegistryClient(schemaRegistryUrl, 100), > Collections.*singletonMap*(AbstractKafkaAvroSerDeConfig. > *SCHEMA_REGISTRY_URL_CONFIG*, > schemaRegistryUrl)); > final Serde<GenericRecord> valueSerde = new GenericAvroSerde( > new CachedSchemaRegistryClient(schemaRegistryUrl, 100), > Collections.*singletonMap*(AbstractKafkaAvroSerDeConfig. > *SCHEMA_REGISTRY_URL_CONFIG*, > schemaRegistryUrl)); > streamsConfiguration.put(ConsumerConfig.*AUTO_OFFSET_RESET_CONFIG*, > *"earliest"*); > streamsConfiguration.put(StreamsConfig.*COMMIT_INTERVAL_MS_CONFIG*, 10 * > 1000); > > final KStreamBuilder builder = new KStreamBuilder(); > > final KStream<GenericRecord, GenericRecord> record = builder.stream( > *"dbserver1.employees.employees"*); > > record.print(keySerde, valueSerde); > > record.to(keySerde, valueSerde, *"newtopic"*); > > > record.foreach((key, val) -> System.*out*.println(key.toString()+*" "* > +val.toString())); > final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration) > ; > > streams.cleanUp(); > streams.start(); > > > > When run print() works as I can see the record in the console, but Im > unable to get the record written to the “newtopic”, failing with the below > error > > > > *Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, > topic=dbserver1.employees.employees, partition=0, offset=0* > > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:217) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) > > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > *Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer > (key: io.confluent.examples.streams.utils.GenericAvroSerializer / value: > io.confluent.examples.streams.utils.GenericAvroSerializer) is not > compatible to the actual key or value type (key type: [B / value type: [B). > Change the default Serdes in StreamConfig or provide correct Serdes via > method parameters.* > > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81) > > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) > > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:198) > > ... 2 more > > *Caused by: java.lang.ClassCastException: [B cannot be cast to > org.apache.avro.generic.GenericRecord* > > at > io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25) > > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:77) > > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79) > > ... 5 more > > > > Regards, > > Somasundaram S >
signature.asc
Description: OpenPGP digital signature