Dear Flink Community!
We have noticed a recent request for Hortonworks schema registry support (
FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We have
an implementation for it already, and we would be happy to contribute it to
Apache Flink.
You can find the documentation below[1]. Let us know your thoughts!
Best Regards,
Matyas
[1] Flink Avro Cloudera Registry User Guide
-----------------------------------------------------------
Add the following dependency to use the schema registry integration:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>${flink.version}</version>
</dependency>
The schema registry can be plugged directly into the FlinkKafkaConsumer and
FlinkKafkaProducer using the appropriate schema:
-
org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
-
org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
Supported types
----------------------
- Avro Specific Record types
- Avro Generic Records
- Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long,
String, Boolean
SchemaRegistrySerializationSchema
--------------------------------------------------
The serialization schema can be constructed using the included builder
object SchemaRegistrySerializationSchema.builder(..).
Required settings:
- Topic configuration when creating the builder. Can be static or dynamic
(extracted from the data)
- RegistryAddress parameter on the builder to establish the connection
Optional settings:
- Arbitrary SchemaRegistry client configuration using the setConfig method
- Key configuration for the produced Kafka messages
- By specifying a KeySelector function that extracts the key from each
record
- Using a Tuple2 stream for (key, value) pairs directly
- Security configuration
Example:
KafkaSerializationSchema<ItemTransaction> schema =
SchemaRegistrySerializationSchema
.<ItemTransaction>builder(topic)
.setRegistryAddress(registryAddress)
.setKey(ItemTransaction::getItemId)
.build();
FlinkKafkaProducer<ItemTransaction> sink = new
FlinkKafkaProducer<>("dummy", schema, kafkaProps,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
SchemaRegistryDeserializationSchema
-----------------------------------------------------
The deserialization schema can be constructed using the included builder
object SchemaRegistryDeserializationSchema.builder(..).
When reading messages (and keys) we always have to specify the expected
Class<T> or record Schema of the input records so that Flink can do any
necessary conversion between the data on Kafka and what is expected.
Required settings:
- Class or Schema of the input messages depending on the data type
- RegistryAddress parameter on the builder to establish the connection
Optional settings:
- Arbitrary SchemaRegistry client configuration using the setConfig method
- Key configuration for the consumed Kafka messages
- Should only be specified when we want to read the keys as well into a
(key, value) stream
- Security configuration
Example:
KafkaDeserializationSchema<ItemTransaction> schema =
SchemaRegistryDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress(registryAddress)
.build();
FlinkKafkaConsumer<ItemTransaction> source = new
FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);