Dénes Bartha created BEAM-12871:
-----------------------------------
Summary: Support Schema Registry
Key: BEAM-12871
URL: https://issues.apache.org/jira/browse/BEAM-12871
Project: Beam
Issue Type: Improvement
Components: io-py-kafka
Reporter: Dénes Bartha
I would like to use the Python components
`{{apache_beam.io.kafka.}}{{WriteToKafka}}` and
`{{apache_beam.io.kafka.ReadFromKafka}}` while accessing and updating [Schema
Registry|https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html]
for sending and reading `protobuf` messages.
In Java it is possible to achieve
[this|https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/kafka/KafkaIO.html]:
{code:java}
PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
.apply(KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
// Use Confluent Schema Registry, specify schema registry URL and value
subject
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081",
"my_topic-value"))
...
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)