> On 8 Feb 2022, at 14:16, Matt Casters <matt.cast...@neo4j.com> wrote:
> 
> For KafkaIO.read() we made a specific provision in the form of class 
> ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the 
> producer side of Avro values yet.

Talking about read from Kafka with Avro and Confluent Schema Registry, have you 
tried to use an API that KafkaIO provides [1] using DeserializerProvider?

It would be something like this (a code snippet from KafkaIO Javadoc):

KafkaIO.<Long, GenericRecord>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")
      .withKeyDeserializer(LongDeserializer.class)
      // Use Confluent Schema Registry, specify schema registry URL and value 
subject
      .withValueDeserializer(
          
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081";, 
"my_topic-value")
I think we can add the similar API for write part as well to make it more 
easy-to-use for users.


>   I'd be happy to dive into the code to add proper support for a Confluent 
> schema registry in KafkaIO.write() but I was just wondering if there was 
> something I might have overlooked.  It's hard to find samples or 
> documentation on producing Avro messages with Beam.

I agree that we have a good field for improvement here but, tbh, KafkaIO 
Javadoc contains a dedicated section for that [2] (see “Use Avro schema with 
Confluent Schema Registry”). 

—
Alexey


[1] 
https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-
 
<https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider->
[2] 
https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
 
<https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html>




Reply via email to