AHeise commented on code in PR #258:
URL: 
https://github.com/apache/flink-connector-kafka/pull/258#discussion_r3434802859


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java:
##########
@@ -447,16 +455,61 @@ public Object read(RowData row, int pos) {
                         final ArrayData valueArray = map.valueArray();
                         final List<Header> headers = new ArrayList<>();
                         for (int i = 0; i < keyArray.size(); i++) {
-                            if (!keyArray.isNullAt(i) && 
!valueArray.isNullAt(i)) {
+                            if (!keyArray.isNullAt(i)) {
+                                // StringData.toString() decodes UTF-8; 
invalid bytes produce
+                                // replacement characters (see FLIP-568).
                                 final String key = 
keyArray.getString(i).toString();
-                                final byte[] value = valueArray.getBinary(i);
+                                final byte[] value =
+                                        valueArray.isNullAt(i) ? null : 
valueArray.getBinary(i);
                                 headers.add(new KafkaHeader(key, value));
                             }
                         }
                         return headers;
                     }
                 }),
 
+        /**
+         * Wire-faithful alternative to {@code headers}: preserves duplicate 
keys and insertion
+         * order using {@code ARRAY<ROW<key STRING, value BYTES>>} instead of 
a lossy MAP.
+         */
+        HEADER_LIST(
+                "header-list",
+                DataTypes.ARRAY(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD(
+                                                        "key", 
DataTypes.STRING().nullable()),
+                                                DataTypes.FIELD(
+                                                        "value", 
DataTypes.BYTES().nullable()))
+                                        .notNull())

Review Comment:
   Took this further: sink ROW is now nullable (matching the null-skip behavior 
already in the code). On the source side key is NOT NULL and null-key headers 
are silently skipped, so VIRTUAL users see a strict type without null checks on 
the key. Added testHeaderListMetadataTypes asserting both shapes, and extended 
testKafkaSourceSinkWithMultiHeaders with a null-ROW round-trip.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to