> On 8 Feb 2022, at 14:16, Matt Casters <matt.cast...@neo4j.com> wrote: > > For KafkaIO.read() we made a specific provision in the form of class > ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the > producer side of Avro values yet.
Talking about read from Kafka with Avro and Confluent Schema Registry, have you tried to use an API that KafkaIO provides [1] using DeserializerProvider? It would be something like this (a code snippet from KafkaIO Javadoc): KafkaIO.<Long, GenericRecord>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("my_topic") .withKeyDeserializer(LongDeserializer.class) // Use Confluent Schema Registry, specify schema registry URL and value subject .withValueDeserializer( ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value") I think we can add the similar API for write part as well to make it more easy-to-use for users. > I'd be happy to dive into the code to add proper support for a Confluent > schema registry in KafkaIO.write() but I was just wondering if there was > something I might have overlooked. It's hard to find samples or > documentation on producing Avro messages with Beam. I agree that we have a good field for improvement here but, tbh, KafkaIO Javadoc contains a dedicated section for that [2] (see “Use Avro schema with Confluent Schema Registry”). — Alexey [1] https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider- <https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-> [2] https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html <https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html>