This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8ef89ffca3 [Feature][Connector-V2][Kafka] Add support for Kafka 
message header (#10335)
8ef89ffca3 is described below

commit 8ef89ffca34cde5437d6498bd9a702aaefa46485
Author: thehkkim <[email protected]>
AuthorDate: Wed Feb 25 15:50:50 2026 +0900

    [Feature][Connector-V2][Kafka] Add support for Kafka message header (#10335)
---
 docs/en/connectors/sink/Kafka.md                   |  64 +++++
 docs/zh/connectors/sink/Kafka.md                   |  20 ++
 .../seatunnel/kafka/config/KafkaSinkOptions.java   |   8 +
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 126 +++++++++-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      |  54 ++++-
 .../DefaultSeaTunnelRowSerializerTest.java         | 270 +++++++++++++++++++++
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     |  40 +++
 .../test/resources/kafka_sink_with_headers.conf    |  52 ++++
 8 files changed, 626 insertions(+), 8 deletions(-)

diff --git a/docs/en/connectors/sink/Kafka.md b/docs/en/connectors/sink/Kafka.md
index d1f8a99d0e..ce3b8bebc4 100644
--- a/docs/en/connectors/sink/Kafka.md
+++ b/docs/en/connectors/sink/Kafka.md
@@ -39,6 +39,7 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 | kafka.config          | Map    | No       | -       | In addition to the 
above parameters that must be specified by the `Kafka producer` client, the 
user can also specify multiple non-mandatory parameters for the `producer` 
client, covering [all the producer parameters specified in the official Kafka 
document](https://kafka.apache.org/documentation.html#producerconfigs).         
                                                                                
                             [...]
 | semantics             | String | No       | NON     | Semantics that can be 
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.                             
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | partition_key_fields  | Array  | No       | -       | Configure which fields 
are used as the key of the kafka message.                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| kafka_headers_fields  | Array  | No       | -       | Configure which fields 
are used as the headers of the kafka message. The field value will be converted 
to a string and used as the header value.                                       
                                                                                
                                                                                
                                                                                
              [...]
 | partition             | Int    | No       | -       | We can specify the 
partition, all messages will be sent to this partition.                         
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 | assign_partitions     | Array  | No       | -       | We can decide which 
partition to send based on the content of the message. The function of this 
parameter is to distribute information.                                         
                                                                                
                                                                                
                                                                                
                     [...]
 | transaction_prefix    | String | No       | -       | If semantic is 
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka 
transaction,kafka distinguishes different transactions by different 
transactionId. This parameter is prefix of  kafka  transactionId, make sure 
different job use different prefix.                                             
                                                                                
                                           [...]
@@ -90,6 +91,25 @@ If not set partition key fields, the null message key will 
be sent to.
 The format of the message key is json, If name is set as the key, for example 
'{"name":"Jack"}'.
 The selected field must be an existing field in the upstream.
 
+### Kafka Headers Fields
+
+For example, if you want to use value of fields from upstream data as kafka 
message headers, you can assign field names to this property.
+
+Upstream data is the following:
+
+| name | age |     data      | source | traceId   |
+|------|-----|---------------|--------|-----------|
+| Jack | 16  | data-example1 | web    | trace-123 |
+| Mary | 23  | data-example2 | mobile | trace-456 |
+
+If source and traceId are set as the kafka headers fields, then these field 
values will be added as headers to the kafka message.
+For example, the first row will have headers: `source=web` and 
`traceId=trace-123`.
+The field values will be converted to strings and used as header values.
+The selected fields must be existing fields in the upstream.
+
+Note:
+Fields configured as Kafka headers will be excluded from the message value 
(payload) and will only be present in the Kafka message headers.
+
 ### Assign Partitions
 
 For example, there are five partitions in total, and the assign_partitions 
field in config is as follows:
@@ -140,6 +160,50 @@ sink {
 }
 ```
 
+### Using Kafka Headers
+
+This example shows how to use kafka_headers_fields to set Kafka message 
headers:
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    parallelism = 1
+    plugin_output = "fake"
+    row.num = 16
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+        source = "string"
+        traceId = "string"
+      }
+    }
+  }
+}
+
+sink {
+  kafka {
+      topic = "test_topic"
+      bootstrap.servers = "localhost:9092"
+      format = json
+      partition_key_fields = ["name"]
+      kafka_headers_fields = ["source", "traceId"]
+      kafka.request.timeout.ms = 60000
+      semantics = EXACTLY_ONCE
+      kafka.config = {
+        acks = "all"
+        request.timeout.ms = 60000
+        buffer.memory = 33554432
+      }
+  }
+}
+```
+
 ### AWS MSK SASL/SCRAM
 
 Replace the following `${username}` and `${password}` with the configuration 
values in AWS MSK.
diff --git a/docs/zh/connectors/sink/Kafka.md b/docs/zh/connectors/sink/Kafka.md
index 682660d392..0addb1f3ee 100644
--- a/docs/zh/connectors/sink/Kafka.md
+++ b/docs/zh/connectors/sink/Kafka.md
@@ -39,6 +39,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
 | kafka.config         | Map    | 否    | -    | 除了上述 Kafka Producer 
客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 
[Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs)
                                                                                
                                |
 | semantics            | String | 否    | NON  | 可以选择的语义是 
EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。                                          
                                                                                
                                                                                
          |
 | partition_key_fields | Array  | 否    | -    | 配置字段用作 kafka 消息的key            
                                                                                
                                                                                
                                                                    |
+| kafka_headers_fields | Array  | 否    | -    | 配置字段用作 kafka 
消息的headers。字段值将被转换为字符串并用作 header 值                                              
                                                                                
                                                                                
     |
 | partition            | Int    | 否    | -    | 可以指定分区,所有消息都会发送到此分区            
                                                                                
                                                                                
                                                                    |
 | assign_partitions    | Array  | 否    | -    | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息  
                                                                                
                                                                                
                                                                    |
 | transaction_prefix   | String | 否    | -    | 
如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 
来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀                              
                                                                                
                             |
@@ -89,6 +90,25 @@ NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢
 消息 key 的格式为 json,如果设置 name 为 key,例如 `{"name":"Jack"}`。
 所选的字段必须是上游数据中已存在的字段。
 
+### Kafka Headers 字段
+
+例如,如果你想使用上游数据中的字段值作为 kafka 消息的 headers,可以将这些字段名指定给此属性。
+
+上游数据如下所示:
+
+| name | age |     data      | source | traceId   |
+|------|-----|---------------|--------|-----------|
+| Jack | 16  | data-example1 | web    | trace-123 |
+| Mary | 23  | data-example2 | mobile | trace-456 |
+
+如果将 source 和 traceId 设置为 kafka headers 字段,那么这些字段值将作为 headers 添加到 kafka 消息中。
+例如,第一行将具有 headers:`source=web` 和 `traceId=trace-123`。
+字段值将被转换为字符串并用作 header 值。
+所选的字段必须是上游数据中已存在的字段。
+
+注意:
+配置为 Kafka headers 的字段将不会包含在消息的 value(payload)中,而只会存在于 Kafka 消息的 headers 中。
+
 ### 分区分配
 
 假设总有五个分区,配置中的 assign_partitions 字段设置为:
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
index ca25d2144b..05fabad51c 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
@@ -46,6 +46,14 @@ public class KafkaSinkOptions extends KafkaBaseOptions {
                     .withDescription(
                             "Configure which fields are used as the key of the 
kafka message.");
 
+    public static final Option<List<String>> KAFKA_HEADERS_FIELDS =
+            Options.key("kafka_headers_fields")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Configure which fields are used as the headers of 
the kafka message. "
+                                    + "The field value will be converted to a 
string and used as the header value.");
+
     public static final Option<KafkaSemantics> SEMANTICS =
             Options.key("semantics")
                     .enumType(KafkaSemantics.class)
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 9ae2e554d8..ec57ef4ce0 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -119,7 +119,7 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                 timestampExtractor(),
                 keyExtractor(null, rowType, format, delimiter, pluginConfig),
                 valueExtractor(rowType, format, delimiter, pluginConfig),
-                headersExtractor());
+                headersExtractor(null, rowType));
     }
 
     public static DefaultSeaTunnelRowSerializer create(
@@ -129,13 +129,24 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
             MessageFormat format,
             String delimiter,
             ReadonlyConfig pluginConfig) {
+        return create(topic, partition, null, rowType, format, delimiter, 
pluginConfig);
+    }
+
+    public static DefaultSeaTunnelRowSerializer create(
+            String topic,
+            Integer partition,
+            List<String> headerFields,
+            SeaTunnelRowType rowType,
+            MessageFormat format,
+            String delimiter,
+            ReadonlyConfig pluginConfig) {
         return new DefaultSeaTunnelRowSerializer(
                 topicExtractor(topic, rowType, format),
                 partitionExtractor(partition),
                 timestampExtractor(),
                 keyExtractor(null, rowType, format, delimiter, pluginConfig),
-                valueExtractor(rowType, format, delimiter, pluginConfig),
-                headersExtractor());
+                valueExtractor(headerFields, rowType, format, delimiter, 
pluginConfig),
+                headersExtractor(headerFields, rowType));
     }
 
     public static DefaultSeaTunnelRowSerializer create(
@@ -145,13 +156,24 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
             MessageFormat format,
             String delimiter,
             ReadonlyConfig pluginConfig) {
+        return create(topic, keyFields, null, rowType, format, delimiter, 
pluginConfig);
+    }
+
+    public static DefaultSeaTunnelRowSerializer create(
+            String topic,
+            List<String> keyFields,
+            List<String> headerFields,
+            SeaTunnelRowType rowType,
+            MessageFormat format,
+            String delimiter,
+            ReadonlyConfig pluginConfig) {
         return new DefaultSeaTunnelRowSerializer(
                 topicExtractor(topic, rowType, format),
                 partitionExtractor(null),
                 timestampExtractor(),
                 keyExtractor(keyFields, rowType, format, delimiter, 
pluginConfig),
-                valueExtractor(rowType, format, delimiter, pluginConfig),
-                headersExtractor());
+                valueExtractor(headerFields, rowType, format, delimiter, 
pluginConfig),
+                headersExtractor(headerFields, rowType));
     }
 
     private static Function<SeaTunnelRow, Integer> partitionNativeExtractor(
@@ -182,6 +204,36 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                 convertToKafkaHeaders((Map<String, String>) 
row.getField(rowType.indexOf(HEADERS)));
     }
 
+    private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor(
+            List<String> headerFields, SeaTunnelRowType rowType) {
+        if (headerFields == null || headerFields.isEmpty()) {
+            return row -> null;
+        }
+
+        int[] headerFieldIndexes = new int[headerFields.size()];
+        for (int i = 0; i < headerFields.size(); i++) {
+            headerFieldIndexes[i] = rowType.indexOf(headerFields.get(i));
+        }
+
+        return row -> {
+            RecordHeaders kafkaHeaders = new RecordHeaders();
+            for (int i = 0; i < headerFields.size(); i++) {
+                String headerName = headerFields.get(i);
+                Object headerValue = row.getField(headerFieldIndexes[i]);
+
+                if (headerValue == null) {
+                    kafkaHeaders.add(new RecordHeader(headerName, null));
+                } else {
+                    kafkaHeaders.add(
+                            new RecordHeader(
+                                    headerName,
+                                    
headerValue.toString().getBytes(StandardCharsets.UTF_8)));
+                }
+            }
+            return kafkaHeaders.iterator().hasNext() ? kafkaHeaders : null;
+        };
+    }
+
     private static Function<SeaTunnelRow, String> topicExtractor(
             String topic, SeaTunnelRowType rowType, MessageFormat format) {
         if ((MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)
@@ -256,6 +308,25 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
         return row -> serializationSchema.serialize(row);
     }
 
+    private static Function<SeaTunnelRow, byte[]> valueExtractor(
+            List<String> headerFields,
+            SeaTunnelRowType rowType,
+            MessageFormat format,
+            String delimiter,
+            ReadonlyConfig pluginConfig) {
+        if (headerFields == null || headerFields.isEmpty()) {
+            return valueExtractor(rowType, format, delimiter, pluginConfig);
+        }
+
+        // Create a new row type excluding header fields
+        SeaTunnelRowType valueRowType = createValueRowType(headerFields, 
rowType);
+        Function<SeaTunnelRow, SeaTunnelRow> valueRowExtractor =
+                createValueRowExtractor(valueRowType, headerFields, rowType);
+        SerializationSchema serializationSchema =
+                createSerializationSchema(valueRowType, format, delimiter, 
false, pluginConfig);
+        return row -> 
serializationSchema.serialize(valueRowExtractor.apply(row));
+    }
+
     private static Function<SeaTunnelRow, byte[]> 
valueExtractor(SeaTunnelRowType rowType) {
         return row -> (byte[]) row.getField(rowType.indexOf(VALUE));
     }
@@ -273,6 +344,25 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
         return new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), 
keyFieldDataTypeArr);
     }
 
+    private static SeaTunnelRowType createValueRowType(
+            List<String> headerFieldNames, SeaTunnelRowType rowType) {
+        // Create a row type excluding header fields
+        List<String> valueFieldNames = new java.util.ArrayList<>();
+        List<SeaTunnelDataType> valueFieldTypes = new java.util.ArrayList<>();
+
+        for (int i = 0; i < rowType.getTotalFields(); i++) {
+            String fieldName = rowType.getFieldName(i);
+            if (!headerFieldNames.contains(fieldName)) {
+                valueFieldNames.add(fieldName);
+                valueFieldTypes.add(rowType.getFieldType(i));
+            }
+        }
+
+        return new SeaTunnelRowType(
+                valueFieldNames.toArray(new String[0]),
+                valueFieldTypes.toArray(new SeaTunnelDataType[0]));
+    }
+
     private static Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor(
             SeaTunnelRowType keyType, SeaTunnelRowType rowType) {
         int[] keyIndex = new int[keyType.getTotalFields()];
@@ -284,7 +374,31 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
             for (int i = 0; i < keyIndex.length; i++) {
                 fields[i] = row.getField(keyIndex[i]);
             }
-            return new SeaTunnelRow(fields);
+
+            SeaTunnelRow newKeyRow = new SeaTunnelRow(fields);
+            newKeyRow.setRowKind(row.getRowKind());
+            newKeyRow.setTableId(row.getTableId());
+            return newKeyRow;
+        };
+    }
+
+    private static Function<SeaTunnelRow, SeaTunnelRow> 
createValueRowExtractor(
+            SeaTunnelRowType valueType, List<String> headerFieldNames, 
SeaTunnelRowType rowType) {
+        int[] valueIndex = new int[valueType.getTotalFields()];
+        for (int i = 0; i < valueType.getTotalFields(); i++) {
+            valueIndex[i] = rowType.indexOf(valueType.getFieldName(i));
+        }
+        return row -> {
+            Object[] fields = new Object[valueType.getTotalFields()];
+            for (int i = 0; i < valueIndex.length; i++) {
+                fields[i] = row.getField(valueIndex[i]);
+            }
+
+            SeaTunnelRow newRow = new SeaTunnelRow(fields);
+            newRow.setRowKind(row.getRowKind());
+            newRow.setTableId(row.getTableId());
+
+            return newRow;
         };
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 848b74004d..7096fde5dc 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -61,6 +61,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOp
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.KAFKA_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.KAFKA_HEADERS_FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.PARTITION;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.PARTITION_KEY_FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.SEMANTICS;
@@ -184,6 +185,12 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         MessageFormat messageFormat = pluginConfig.get(FORMAT);
         String topic = pluginConfig.get(TOPIC);
         if (MessageFormat.NATIVE.equals(messageFormat)) {
+            // Validate that kafka_headers_fields is not configured for NATIVE 
format
+            if (pluginConfig.get(KAFKA_HEADERS_FIELDS) != null) {
+                throw new KafkaConnectorException(
+                        CommonErrorCode.OPERATION_NOT_SUPPORTED,
+                        "kafka_headers_fields is not supported with NATIVE 
format. Please use JSON, TEXT, or other formats.");
+            }
             checkNativeSeaTunnelType(seaTunnelRowType);
             return DefaultSeaTunnelRowSerializer.create(topic, messageFormat, 
seaTunnelRowType);
         }
@@ -199,10 +206,26 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
                     "Cannot select both `partiton` and `partition_key_fields`. 
You can configure only one of them");
         }
 
+        // Validate that partition_key_fields and kafka_headers_fields don't 
overlap
+        List<String> partitionKeyFields = getPartitionKeyFields(pluginConfig, 
seaTunnelRowType);
+        List<String> headerFields = getHeaderFields(pluginConfig, 
seaTunnelRowType);
+        if (!partitionKeyFields.isEmpty() && !headerFields.isEmpty()) {
+            for (String headerField : headerFields) {
+                if (partitionKeyFields.contains(headerField)) {
+                    throw new KafkaConnectorException(
+                            CommonErrorCode.ILLEGAL_ARGUMENT,
+                            String.format(
+                                    "Field '%s' cannot be in both 
partition_key_fields and kafka_headers_fields",
+                                    headerField));
+                }
+            }
+        }
+
         if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
             return DefaultSeaTunnelRowSerializer.create(
                     topic,
-                    getPartitionKeyFields(pluginConfig, seaTunnelRowType),
+                    partitionKeyFields,
+                    headerFields,
                     seaTunnelRowType,
                     messageFormat,
                     delimiter,
@@ -212,6 +235,7 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
             return DefaultSeaTunnelRowSerializer.create(
                     topic,
                     pluginConfig.get(PARTITION),
+                    headerFields,
                     seaTunnelRowType,
                     messageFormat,
                     delimiter,
@@ -219,7 +243,13 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         }
         // By default, all partitions are sent randomly
         return DefaultSeaTunnelRowSerializer.create(
-                topic, Arrays.asList(), seaTunnelRowType, messageFormat, 
delimiter, pluginConfig);
+                topic,
+                Collections.<String>emptyList(),
+                headerFields,
+                seaTunnelRowType,
+                messageFormat,
+                delimiter,
+                pluginConfig);
     }
 
     private KafkaSemantics getKafkaSemantics(ReadonlyConfig pluginConfig) {
@@ -260,6 +290,26 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         return Collections.emptyList();
     }
 
+    private List<String> getHeaderFields(
+            ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+
+        if (pluginConfig.get(KAFKA_HEADERS_FIELDS) != null) {
+            List<String> headerFields = pluginConfig.get(KAFKA_HEADERS_FIELDS);
+            List<String> rowTypeFieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
+            for (String headerField : headerFields) {
+                if (!rowTypeFieldNames.contains(headerField)) {
+                    throw new KafkaConnectorException(
+                            CommonErrorCode.ILLEGAL_ARGUMENT,
+                            String.format(
+                                    "Header field not found: %s, rowType: %s",
+                                    headerField, rowTypeFieldNames));
+                }
+            }
+            return headerFields;
+        }
+        return Collections.emptyList();
+    }
+
     private void checkNativeSeaTunnelType(SeaTunnelRowType seaTunnelRowType) {
         SeaTunnelRowType exceptRowType = 
nativeTableSchema().toPhysicalRowDataType();
         for (int i = 0; i < exceptRowType.getFieldTypes().length; i++) {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializerTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializerTest.java
index a60a320281..ec3575139b 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializerTest.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializerTest.java
@@ -19,17 +19,23 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 public class DefaultSeaTunnelRowSerializerTest {
 
@@ -65,4 +71,268 @@ public class DefaultSeaTunnelRowSerializerTest {
         Assertions.assertEquals("key1", new String(record.key()));
         Assertions.assertEquals("value1", new String(record.value()));
     }
+
+    @Test
+    public void testKafkaHeaders() {
+        String topic = "test_topic";
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "source", "traceId"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.STRING_TYPE
+                        });
+        MessageFormat format = MessageFormat.JSON;
+        String delimiter = ",";
+        Map<String, Object> configMap = new HashMap<>();
+        ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+        // Test with header fields
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        topic,
+                        Arrays.asList("id"),
+                        Arrays.asList("source", "traceId"),
+                        rowType,
+                        format,
+                        delimiter,
+                        pluginConfig);
+
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "test", "web", 
"trace-123"});
+        ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+        Assertions.assertEquals("test_topic", record.topic());
+        Assertions.assertNotNull(record.headers());
+
+        Header sourceHeader = record.headers().lastHeader("source");
+        Assertions.assertNotNull(sourceHeader);
+        Assertions.assertEquals("web", new String(sourceHeader.value(), 
StandardCharsets.UTF_8));
+
+        Header traceIdHeader = record.headers().lastHeader("traceId");
+        Assertions.assertNotNull(traceIdHeader);
+        Assertions.assertEquals(
+                "trace-123", new String(traceIdHeader.value(), 
StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testKafkaHeadersWithNullValue() {
+        String topic = "test_topic";
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "source", "traceId"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.STRING_TYPE
+                        });
+        MessageFormat format = MessageFormat.JSON;
+        String delimiter = ",";
+        Map<String, Object> configMap = new HashMap<>();
+        ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        topic,
+                        Arrays.asList("id"),
+                        Arrays.asList("source", "traceId"),
+                        rowType,
+                        format,
+                        delimiter,
+                        pluginConfig);
+
+        // Test with null header value
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "test", "web", 
null});
+        ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+        Assertions.assertEquals("test_topic", record.topic());
+        Assertions.assertNotNull(record.headers());
+
+        Header sourceHeader = record.headers().lastHeader("source");
+        Assertions.assertNotNull(sourceHeader);
+        Assertions.assertEquals("web", new String(sourceHeader.value(), 
StandardCharsets.UTF_8));
+
+        // Null value should be written as null in headers
+        Header traceIdHeader = record.headers().lastHeader("traceId");
+        Assertions.assertNotNull(traceIdHeader);
+        Assertions.assertNull(traceIdHeader.value());
+    }
+
+    @Test
+    public void testBackwardCompatibilityWithKeyFields() {
+        // Test that the 6-parameter create method (without headerFields) 
still works
+        String topic = "test_topic";
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "age"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.INT_TYPE
+                        });
+        MessageFormat format = MessageFormat.JSON;
+        String delimiter = ",";
+        Map<String, Object> configMap = new HashMap<>();
+        ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+        // Test with keyFields but no headerFields (backward compatibility)
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        topic, Arrays.asList("id"), rowType, format, 
delimiter, pluginConfig);
+
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "John", 25});
+        ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+        Assertions.assertEquals("test_topic", record.topic());
+        Assertions.assertNotNull(record.value());
+
+        // Value should contain all fields
+        String valueString = new String(record.value(), 
StandardCharsets.UTF_8);
+        Assertions.assertTrue(valueString.contains("\"id\""));
+        Assertions.assertTrue(valueString.contains("\"name\""));
+        Assertions.assertTrue(valueString.contains("\"age\""));
+    }
+
+    @Test
+    public void testBackwardCompatibilityWithPartition() {
+        // Test that the 6-parameter create method with partition (without 
headerFields) still works
+        String topic = "test_topic";
+        Integer partition = 0;
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "age"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.INT_TYPE
+                        });
+        MessageFormat format = MessageFormat.JSON;
+        String delimiter = ",";
+        Map<String, Object> configMap = new HashMap<>();
+        ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+        // Test with partition but no headerFields (backward compatibility)
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        topic, partition, rowType, format, delimiter, 
pluginConfig);
+
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "John", 25});
+        ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+        Assertions.assertEquals("test_topic", record.topic());
+        Assertions.assertEquals(partition, record.partition());
+        Assertions.assertNotNull(record.value());
+
+        // Value should contain all fields
+        String valueString = new String(record.value(), 
StandardCharsets.UTF_8);
+        Assertions.assertTrue(valueString.contains("\"id\""));
+        Assertions.assertTrue(valueString.contains("\"name\""));
+        Assertions.assertTrue(valueString.contains("\"age\""));
+    }
+
+    @Test
+    public void testHeaderFieldsExcludedFromValue() {
+        String topic = "test_topic";
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "source", "traceId"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.STRING_TYPE
+                        });
+        MessageFormat format = MessageFormat.JSON;
+        String delimiter = ",";
+        Map<String, Object> configMap = new HashMap<>();
+        ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+        // Test with header fields
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        topic,
+                        Arrays.asList("id"),
+                        Arrays.asList("source", "traceId"),
+                        rowType,
+                        format,
+                        delimiter,
+                        pluginConfig);
+
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "test", "web", 
"trace-123"});
+        ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+        Assertions.assertEquals("test_topic", record.topic());
+
+        // Verify headers contain the expected fields
+        Header sourceHeader = record.headers().lastHeader("source");
+        Assertions.assertNotNull(sourceHeader);
+        Assertions.assertEquals("web", new String(sourceHeader.value(), 
StandardCharsets.UTF_8));
+
+        Header traceIdHeader = record.headers().lastHeader("traceId");
+        Assertions.assertNotNull(traceIdHeader);
+        Assertions.assertEquals(
+                "trace-123", new String(traceIdHeader.value(), 
StandardCharsets.UTF_8));
+
+        // Verify value does NOT contain header fields (source and traceId)
+        // Header fields are only in Kafka headers, not in the message value
+        String valueString = new String(record.value(), 
StandardCharsets.UTF_8);
+        // The value should only contain id and name fields
+        Assertions.assertTrue(valueString.contains("\"id\""));
+        Assertions.assertTrue(valueString.contains("\"name\""));
+        // Header fields should NOT be in the value
+        Assertions.assertFalse(valueString.contains("\"source\""));
+        Assertions.assertFalse(valueString.contains("\"traceId\""));
+    }
+
+    @Test
+    public void testKafkaHeadersWithNullValueExcludedFromValue() {
+        // Test that null header values are written as "null" string in headers
+        // (consistent with partition_key_fields behavior)
+        // and header fields are excluded from the message value
+        String topic = "test_topic";
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "source", "traceId"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.STRING_TYPE
+                        });
+        MessageFormat format = MessageFormat.JSON;
+        String delimiter = ",";
+        Map<String, Object> configMap = new HashMap<>();
+        ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        topic,
+                        Arrays.asList("id"),
+                        Arrays.asList("source", "traceId"),
+                        rowType,
+                        format,
+                        delimiter,
+                        pluginConfig);
+
+        // Test with null header value
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "test", "web", 
null});
+        ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+        Assertions.assertEquals("test_topic", record.topic());
+        Assertions.assertNotNull(record.headers());
+
+        Header sourceHeader = record.headers().lastHeader("source");
+        Assertions.assertNotNull(sourceHeader);
+        Assertions.assertEquals("web", new String(sourceHeader.value(), 
StandardCharsets.UTF_8));
+
+        // Null value should be written as null in headers
+        Header traceIdHeader = record.headers().lastHeader("traceId");
+        Assertions.assertNotNull(traceIdHeader);
+        Assertions.assertNull(traceIdHeader.value());
+
+        // Header fields should NOT be in the message value
+        String valueString = new String(record.value(), 
StandardCharsets.UTF_8);
+        Assertions.assertTrue(valueString.contains("\"id\""));
+        Assertions.assertTrue(valueString.contains("\"name\""));
+        Assertions.assertFalse(valueString.contains("\"source\""));
+        Assertions.assertFalse(valueString.contains("\"traceId\""));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index ae167e31fb..7a5a188810 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -296,6 +296,46 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(10, data.size());
     }
 
+    @TestTemplate
+    public void testSinkKafkaWithHeaders(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafka_sink_with_headers.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        String topicName = "test_topic_headers";
+        List<ConsumerRecord<String, String>> records = 
getKafkaRecordData(topicName);
+
+        Assertions.assertEquals(10, records.size());
+
+        // Verify that headers contain the expected fields (id, name)
+        for (ConsumerRecord<String, String> record : records) {
+            Map<String, String> headers = 
convertHeadersToMap(record.headers());
+
+            // Verify headers contain id and name
+            Assertions.assertTrue(headers.containsKey("id"), "Header should 
contain 'id' field");
+            Assertions.assertTrue(
+                    headers.containsKey("name"), "Header should contain 'name' 
field");
+
+            // Verify the value (payload) is a JSON object
+            ObjectMapper objectMapper = new ObjectMapper();
+            ObjectNode payloadNode = objectMapper.readValue(record.value(), 
ObjectNode.class);
+
+            // Verify payload does NOT contain the header fields (id, name)
+            Assertions.assertFalse(
+                    payloadNode.has("id"),
+                    "Payload should NOT contain 'id' field (it's in headers)");
+            Assertions.assertFalse(
+                    payloadNode.has("name"),
+                    "Payload should NOT contain 'name' field (it's in 
headers)");
+
+            // Verify payload contains the non-header fields (age, email, 
description)
+            Assertions.assertTrue(payloadNode.has("age"), "Payload should 
contain 'age' field");
+            Assertions.assertTrue(payloadNode.has("email"), "Payload should 
contain 'email' field");
+            Assertions.assertTrue(
+                    payloadNode.has("description"), "Payload should contain 
'description' field");
+        }
+    }
+
     @TestTemplate
     public void testDefaultRandomSinkKafka(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_with_headers.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_with_headers.conf
new file mode 100644
index 0000000000..7a65ff3b7d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_with_headers.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    schema = {
+      fields {
+        id = int
+        name = string
+        age = int
+        email = string
+        description = string
+      }
+    }
+  }
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_headers"
+    format = json
+    kafka_headers_fields = ["id", "name"]
+  }
+}


Reply via email to