dybyte commented on code in PR #10335:
URL: https://github.com/apache/seatunnel/pull/10335#discussion_r2772923108


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -260,6 +290,26 @@ private List<String> getPartitionKeyFields(
         return Collections.emptyList();
     }
 
+    private List<String> getHeaderFields(
+            ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+
+        if (pluginConfig.get(KAFKA_HEADERS_FIELDS) != null) {
+            List<String> headerFields = pluginConfig.get(KAFKA_HEADERS_FIELDS);
+            List<String> rowTypeFieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
+            for (String headerField : headerFields) {
+                if (!rowTypeFieldNames.contains(headerField)) {
+                    throw new KafkaConnectorException(
+                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                            String.format(
+                                    "Header field not found: %s, rowType: %s",
+                                    headerField, rowTypeFieldNames));

Review Comment:
   ditto



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java:
##########
@@ -182,6 +204,32 @@ private static Function<SeaTunnelRow, Iterable<Header>> 
headersExtractor(
                 convertToKafkaHeaders((Map<String, String>) 
row.getField(rowType.indexOf(HEADERS)));
     }
 
+    private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor(
+            List<String> headerFields, SeaTunnelRowType rowType) {
+        if (headerFields == null || headerFields.isEmpty()) {
+            return row -> null;
+        }
+
+        int[] headerFieldIndexes = new int[headerFields.size()];
+        for (int i = 0; i < headerFields.size(); i++) {
+            headerFieldIndexes[i] = rowType.indexOf(headerFields.get(i));
+        }
+
+        return row -> {
+            RecordHeaders kafkaHeaders = new RecordHeaders();
+            for (int i = 0; i < headerFields.size(); i++) {
+                String headerName = headerFields.get(i);
+                Object headerValue = row.getField(headerFieldIndexes[i]);
+                // Write "null" string for null values to keep fields in 
headers
+                // (consistent with partition_key_fields behavior)
+                String valueStr = headerValue != null ? headerValue.toString() 
: "null";

Review Comment:
   Converting `null` to a literal `"null"` string distorts the actual data and 
makes it indistinguishable from a real string `"null"`. Please consider using a 
null payload in the header.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java:
##########
@@ -182,6 +204,32 @@ private static Function<SeaTunnelRow, Iterable<Header>> 
headersExtractor(
                 convertToKafkaHeaders((Map<String, String>) 
row.getField(rowType.indexOf(HEADERS)));
     }
 
+    private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor(
+            List<String> headerFields, SeaTunnelRowType rowType) {
+        if (headerFields == null || headerFields.isEmpty()) {
+            return row -> null;
+        }
+
+        int[] headerFieldIndexes = new int[headerFields.size()];
+        for (int i = 0; i < headerFields.size(); i++) {
+            headerFieldIndexes[i] = rowType.indexOf(headerFields.get(i));
+        }
+
+        return row -> {
+            RecordHeaders kafkaHeaders = new RecordHeaders();
+            for (int i = 0; i < headerFields.size(); i++) {
+                String headerName = headerFields.get(i);
+                Object headerValue = row.getField(headerFieldIndexes[i]);
+                // Write "null" string for null values to keep fields in 
headers
+                // (consistent with partition_key_fields behavior)
+                String valueStr = headerValue != null ? headerValue.toString() 
: "null";
+                kafkaHeaders.add(
+                        new RecordHeader(headerName, 
valueStr.getBytes(StandardCharsets.UTF_8)));
+            }
+            return kafkaHeaders.toArray().length > 0 ? kafkaHeaders : null;

Review Comment:
   ```suggestion
               return kafkaHeaders.iterator().hasNext() ? kafkaHeaders : null;
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -199,10 +206,26 @@ private SeaTunnelRowSerializer<byte[], byte[]> 
getSerializer(
                     "Cannot select both `partiton` and `partition_key_fields`. 
You can configure only one of them");
         }
 
+        // Validate that partition_key_fields and kafka_headers_fields don't 
overlap
+        List<String> partitionKeyFields = getPartitionKeyFields(pluginConfig, 
seaTunnelRowType);
+        List<String> headerFields = getHeaderFields(pluginConfig, 
seaTunnelRowType);
+        if (!partitionKeyFields.isEmpty() && !headerFields.isEmpty()) {
+            for (String headerField : headerFields) {
+                if (partitionKeyFields.contains(headerField)) {
+                    throw new KafkaConnectorException(
+                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                            String.format(
+                                    "Field '%s' cannot be in both 
partition_key_fields and kafka_headers_fields",
+                                    headerField));

Review Comment:
   Let's use 
https://github.com/apache/seatunnel/blob/4d12727ad9325f2fa2245cc0d97f13e39e348393/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java#L300-L305



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to