[
https://issues.apache.org/jira/browse/BEAM-12871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533444#comment-17533444
]
Sushant commented on BEAM-12871:
--------------------------------
This would very important feature to have considering many folks will love to
use python sdk with kafka confluent schema registry
> Support for Confluent Schema Registry
> -------------------------------------
>
> Key: BEAM-12871
> URL: https://issues.apache.org/jira/browse/BEAM-12871
> Project: Beam
> Issue Type: New Feature
> Components: cross-language, io-py-kafka
> Reporter: Dénes Bartha
> Priority: P3
>
> I would like to use the Python components
> [{{apache_beam.io.kafka.}}{{WriteToKafka}}|https://beam.apache.org/releases/pydoc/2.32.0/apache_beam.io.kafka.html#apache_beam.io.kafka.WriteToKafka]
> and
> [{{apache_beam.io.kafka.ReadFromKafka}}|https://beam.apache.org/releases/pydoc/2.32.0/apache_beam.io.kafka.html#apache_beam.io.kafka.ReadFromKafka]
> while accessing and updating [Schema
> Registry|https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html]
> for sending and reading `protobuf` messages.
> In Java it is possible to achieve
> [this|https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/kafka/KafkaIO.html]:
>
> {code:java}
> PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
> .apply(KafkaIO.<Long, GenericRecord>read()
> .withBootstrapServers("broker_1:9092,broker_2:9092")
> .withTopic("my_topic")
> .withKeyDeserializer(LongDeserializer.class)
> // Use Confluent Schema Registry, specify schema registry URL and value
> subject
> .withValueDeserializer(
>
> ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081",
> "my_topic-value"))
> ...
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)