azagrebin commented on a change in pull request #6615: [FLINK-8354]
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r254226128
##########
File path:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
##########
@@ -55,4 +57,13 @@
* @return null or the target topic
*/
String getTargetTopic(T element);
+
+ /**
+ *
+ * @param element The incoming element to be serialized
+ * @return collection of headers (maybe empty)
+ */
+ default Iterable<Map.Entry<String, byte[]>> headers(T element) {
Review comment:
Let's say the new serialization schema looks like this:
```
interface KafkaSerializationSchema<T> {
ProducerRecord serialize(T element, @Nullable Long timestamp,
PartitionInfo partitionInfo);
}
class PartitionInfo {
private final int parallelInstanceId, int parallelInstances, int[]
partitions);
// ....
}
```
This way it should preserve previous Partitioner API capabilities.
Regarding the multiple adaptors, from what I see, the difference is that the
older connectors do not support timestamp in `ProducerRecord`. Could we have
one adaptor in base module which calls `ProducerRecord` constructor without
timestamp if `KafkaSerializationSchema.serialize` is called with `timestamp =
null`? Producers, which do not supported timestamp, could call new schema with
`timestamp = null`, including base one (although `ProducerRecord` has
`timestamp` in base).
Regarding the open method, could adaptor or partitioner still be opened the
similar way as it happens now to current partitioner? I agree that the new
schema would look weird with the open method.
`PartitionInfo` could be populated in a similar way as `int[] partitions`
are cached now in `FlinkKafkaProducer.topicPartitionsMap` in
`FlinkKafkaProducer.invoke`, just `parallelInstanceId` and `parallelInstances`
are added.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services