Cross posted at SO:
https://stackoverflow.com/questions/47712933/kstream-error-reading-and-writing-avro-records

I put an answer there.

-Matthias

On 12/7/17 10:29 PM, Somasundaram Sekar wrote:
> I’m trying to write avro record that I read from a topic into another
> topic, intentions it to augment it with transformation after I get this
> routing working. I have used the KStream with avro code from one of the
> example with some modifications to connect to Schema Registry for
> retrieving the avro schema.
> 
> 
> 
> streamsConfiguration.put(StreamsConfig.*APPLICATION_ID_CONFIG*,
> *"mysql-stream-processing"*);
> streamsConfiguration.put(StreamsConfig.*BOOTSTRAP_SERVERS_CONFIG*,
> bootstrapServers);
> streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.
> *SCHEMA_REGISTRY_URL_CONFIG*, schemaRegistryUrl);
> final Serde<GenericRecord> keySerde = new GenericAvroSerde(
>         new CachedSchemaRegistryClient(schemaRegistryUrl, 100),
>         Collections.*singletonMap*(AbstractKafkaAvroSerDeConfig.
> *SCHEMA_REGISTRY_URL_CONFIG*,
>                 schemaRegistryUrl));
> final Serde<GenericRecord> valueSerde = new GenericAvroSerde(
>         new CachedSchemaRegistryClient(schemaRegistryUrl, 100),
>         Collections.*singletonMap*(AbstractKafkaAvroSerDeConfig.
> *SCHEMA_REGISTRY_URL_CONFIG*,
>                 schemaRegistryUrl));
> streamsConfiguration.put(ConsumerConfig.*AUTO_OFFSET_RESET_CONFIG*,
> *"earliest"*);
> streamsConfiguration.put(StreamsConfig.*COMMIT_INTERVAL_MS_CONFIG*, 10 *
> 1000);
> 
> final KStreamBuilder builder = new KStreamBuilder();
> 
> final KStream<GenericRecord, GenericRecord> record = builder.stream(
> *"dbserver1.employees.employees"*);
> 
> record.print(keySerde, valueSerde);
> 
> record.to(keySerde, valueSerde, *"newtopic"*);
> 
> 
> record.foreach((key, val) -> System.*out*.println(key.toString()+*"  "*
> +val.toString()));
> final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration)
> ;
> 
> streams.cleanUp();
> streams.start();
> 
> 
> 
> When run print() works as I can see the record in the console, but Im
> unable to get the record written to the “newtopic”, failing with the below
> error
> 
> 
> 
> *Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000,
> topic=dbserver1.employees.employees, partition=0, offset=0*
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:217)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> 
> *Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer
> (key: io.confluent.examples.streams.utils.GenericAvroSerializer / value:
> io.confluent.examples.streams.utils.GenericAvroSerializer) is not
> compatible to the actual key or value type (key type: [B / value type: [B).
> Change the default Serdes in StreamConfig or provide correct Serdes via
> method parameters.*
> 
>         at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81)
> 
>         at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> 
>         at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:198)
> 
>         ... 2 more
> 
> *Caused by: java.lang.ClassCastException: [B cannot be cast to
> org.apache.avro.generic.GenericRecord*
> 
>         at
> io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25)
> 
>         at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:77)
> 
>         at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
> 
>         ... 5 more
> 
> 
> 
> Regards,
> 
> Somasundaram S
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to