loserwang1024 commented on code in PR #258:
URL: 
https://github.com/apache/flink-connector-kafka/pull/258#discussion_r3321845504


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -623,6 +625,42 @@ public Object read(ConsumerRecord<?, ?> record) {
                     }
                 }),
 
+        /**
+         * Wire-faithful alternative to {@code headers}: preserves duplicate 
keys and insertion
+         * order using {@code ARRAY<ROW<key STRING, value BYTES>>} instead of 
a lossy MAP.
+         */
+        MULTI_HEADERS(
+                "multi-headers",
+                DataTypes.ARRAY(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD(
+                                                        "key",
+                                                        
DataTypes.STRING().nullable()),
+                                                DataTypes.FIELD(
+                                                        "value",
+                                                        
DataTypes.BYTES().nullable()))
+                                        .notNull())
+                        .notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        final List<GenericRowData> rows = new ArrayList<>();
+                        for (Header header : record.headers()) {
+                            final GenericRowData row = new GenericRowData(2);
+                            row.setField(
+                                    0,
+                                    header.key() == null

Review Comment:
   <img width="559" height="552" alt="Image" 
src="https://github.com/user-attachments/assets/f8c76ae4-a7c8-4cdd-b799-ddeaed359436";
 />
   header key is never be null



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -623,6 +625,42 @@ public Object read(ConsumerRecord<?, ?> record) {
                     }
                 }),
 
+        /**
+         * Wire-faithful alternative to {@code headers}: preserves duplicate 
keys and insertion
+         * order using {@code ARRAY<ROW<key STRING, value BYTES>>} instead of 
a lossy MAP.
+         */
+        MULTI_HEADERS(
+                "multi-headers",

Review Comment:
   multl-headers is slightly ambiguous: does "multi-headers" mean "multiple 
headers" (just more than one)? Or "headers with multi-valued keys"? 
   
   What about `headers-list` or `ordered-headers` @leonardBang , WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to