Also forgot to attach the information regarding how did I convert the avro
objects to bytes in the approach that I took with deprecated kafka producer.
protected byte[] getValueBytes(Value value)
{
DatumWriter<Value> valWriter = new SpecificDatumWriter<Value>(
Value.getSchema());
ByteArrayOutputStream valOut = new ByteArrayOutputStream();
BinaryEncoder valEncoder = EncoderFactory.get().binaryEncoder(valOut,
null);
try {
valWriter.write(value, valEncoder);
// TODO Auto-generated catch block
valEncoder.flush();
// TODO Auto-generated catch block
valOut.close();
// TODO Auto-generated catch block
} catch (Exception e) {
}
return valOut.toByteArray();
}
protected byte[] getKeyBytes(Key key) {
DatumWriter<Key> keyWriter = new SpecificDatumWriter<Key>(
key.getSchema());
ByteArrayOutputStream keyOut = new ByteArrayOutputStream();
BinaryEncoder keyEncoder = EncoderFactory.get().binaryEncoder(keyOut,
null);
try {
keyWriter.write(key, keyEncoder);
// TODO Auto-generated catch block
keyEncoder.flush();
// TODO Auto-generated catch block
keyOut.close();
// TODO Auto-generated catch block
} catch (Exception e) {
}
return keyOut.toByteArray();
}
From: Ghiya, Jay (GE Healthcare)
Sent: 18 May 2022 21:51
To: [email protected]
Cc: [email protected]; Pandiaraj, Satheesh kumar (GE Healthcare)
<[email protected]>; Kumar, Vipin (GE Healthcare)
<[email protected]>
Subject: Kafka Sink Key and Value Avro Schema Usage Issues
Hi Team,
This is regarding Flink Kafka Sink. We have a usecase where we have headers and
both key and value as Avro Schema.
Below is the expectation in terms of intuitiveness for avro kafka key and value:
KafkaSink.<ProducerRecord<Key,Value>>builder()
.setBootstrapServers(cloudkafkaBrokerAPI)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setKeySerializationSchema(
ConfluentRegistryAvroSerializationSchema
.forSpecific(
key.class,
"Key",
cloudSchemaRegistryURL))
.setValueSerializationSchema(
ConfluentRegistryAvroSerializationSchema
.forSpecific(
Value.class,"val",
cloudSchemaRegistryURL))
.setTopic(outputTopic)
.build())
.build();
What I understood currently it does not accept key and value both as avro
schemas as part of kafka sink. It only accepts sink.
First I tried to use the deprecated Flink Kafka Producer by implementing
KafkaSerializationSchema and supplying properties of avro ser and der via :
cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
The problem here is I am able to run this example but the schema that gets
stored in confluent schema registry is:
{
"subject": "ddp_out-key",
"version": 1,
"id": 1,
"schema": "\"bytes\""
}
Instead of full schema it has just recognized the whole as bytes. So I am
looking for a solution without kafka sink to make it work as of now and is
there feature request part of roadmap for adding support
To kafka sink itself for producer record as that would be ideal. The previous
operator can send the producer record with key,val and headers and then it can
be forwarded ahead.
-Jay
GEHC