This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8ef89ffca3 [Feature][Connector-V2][Kafka] Add support for Kafka
message header (#10335)
8ef89ffca3 is described below
commit 8ef89ffca34cde5437d6498bd9a702aaefa46485
Author: thehkkim <[email protected]>
AuthorDate: Wed Feb 25 15:50:50 2026 +0900
[Feature][Connector-V2][Kafka] Add support for Kafka message header (#10335)
---
docs/en/connectors/sink/Kafka.md | 64 +++++
docs/zh/connectors/sink/Kafka.md | 20 ++
.../seatunnel/kafka/config/KafkaSinkOptions.java | 8 +
.../serialize/DefaultSeaTunnelRowSerializer.java | 126 +++++++++-
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 54 ++++-
.../DefaultSeaTunnelRowSerializerTest.java | 270 +++++++++++++++++++++
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 40 +++
.../test/resources/kafka_sink_with_headers.conf | 52 ++++
8 files changed, 626 insertions(+), 8 deletions(-)
diff --git a/docs/en/connectors/sink/Kafka.md b/docs/en/connectors/sink/Kafka.md
index d1f8a99d0e..ce3b8bebc4 100644
--- a/docs/en/connectors/sink/Kafka.md
+++ b/docs/en/connectors/sink/Kafka.md
@@ -39,6 +39,7 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| kafka.config | Map | No | - | In addition to the
above parameters that must be specified by the `Kafka producer` client, the
user can also specify multiple non-mandatory parameters for the `producer`
client, covering [all the producer parameters specified in the official Kafka
document](https://kafka.apache.org/documentation.html#producerconfigs).
[...]
| semantics | String | No | NON | Semantics that can be
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
[...]
| partition_key_fields | Array | No | - | Configure which fields
are used as the key of the kafka message.
[...]
+| kafka_headers_fields | Array | No | - | Configure which fields
are used as the headers of the kafka message. The field value will be converted
to a string and used as the header value.
[...]
| partition | Int | No | - | We can specify the
partition, all messages will be sent to this partition.
[...]
| assign_partitions | Array | No | - | We can decide which
partition to send based on the content of the message. The function of this
parameter is to distribute information.
[...]
| transaction_prefix | String | No | - | If semantic is
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka
transaction,kafka distinguishes different transactions by different
transactionId. This parameter is prefix of kafka transactionId, make sure
different job use different prefix.
[...]
@@ -90,6 +91,25 @@ If not set partition key fields, the null message key will
be sent to.
The format of the message key is json, If name is set as the key, for example
'{"name":"Jack"}'.
The selected field must be an existing field in the upstream.
+### Kafka Headers Fields
+
+For example, if you want to use value of fields from upstream data as kafka
message headers, you can assign field names to this property.
+
+Upstream data is the following:
+
+| name | age | data | source | traceId |
+|------|-----|---------------|--------|-----------|
+| Jack | 16 | data-example1 | web | trace-123 |
+| Mary | 23 | data-example2 | mobile | trace-456 |
+
+If source and traceId are set as the kafka headers fields, then these field
values will be added as headers to the kafka message.
+For example, the first row will have headers: `source=web` and
`traceId=trace-123`.
+The field values will be converted to strings and used as header values.
+The selected fields must be existing fields in the upstream.
+
+Note:
+Fields configured as Kafka headers will be excluded from the message value
(payload) and will only be present in the Kafka message headers.
+
### Assign Partitions
For example, there are five partitions in total, and the assign_partitions
field in config is as follows:
@@ -140,6 +160,50 @@ sink {
}
```
+### Using Kafka Headers
+
+This example shows how to use kafka_headers_fields to set Kafka message
headers:
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ parallelism = 1
+ plugin_output = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ source = "string"
+ traceId = "string"
+ }
+ }
+ }
+}
+
+sink {
+ kafka {
+ topic = "test_topic"
+ bootstrap.servers = "localhost:9092"
+ format = json
+ partition_key_fields = ["name"]
+ kafka_headers_fields = ["source", "traceId"]
+ kafka.request.timeout.ms = 60000
+ semantics = EXACTLY_ONCE
+ kafka.config = {
+ acks = "all"
+ request.timeout.ms = 60000
+ buffer.memory = 33554432
+ }
+ }
+}
+```
+
### AWS MSK SASL/SCRAM
Replace the following `${username}` and `${password}` with the configuration
values in AWS MSK.
diff --git a/docs/zh/connectors/sink/Kafka.md b/docs/zh/connectors/sink/Kafka.md
index 682660d392..0addb1f3ee 100644
--- a/docs/zh/connectors/sink/Kafka.md
+++ b/docs/zh/connectors/sink/Kafka.md
@@ -39,6 +39,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
| kafka.config | Map | 否 | - | 除了上述 Kafka Producer
客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖
[Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs)
|
| semantics | String | 否 | NON | 可以选择的语义是
EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。
|
| partition_key_fields | Array | 否 | - | 配置字段用作 kafka 消息的key
|
+| kafka_headers_fields | Array | 否 | - | 配置字段用作 kafka
消息的headers。字段值将被转换为字符串并用作 header 值
|
| partition | Int | 否 | - | 可以指定分区,所有消息都会发送到此分区
|
| assign_partitions | Array | 否 | - | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息
|
| transaction_prefix | String | 否 | - |
如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId
来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀
|
@@ -89,6 +90,25 @@ NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢
消息 key 的格式为 json,如果设置 name 为 key,例如 `{"name":"Jack"}`。
所选的字段必须是上游数据中已存在的字段。
+### Kafka Headers 字段
+
+例如,如果你想使用上游数据中的字段值作为 kafka 消息的 headers,可以将这些字段名指定给此属性。
+
+上游数据如下所示:
+
+| name | age | data | source | traceId |
+|------|-----|---------------|--------|-----------|
+| Jack | 16 | data-example1 | web | trace-123 |
+| Mary | 23 | data-example2 | mobile | trace-456 |
+
+如果将 source 和 traceId 设置为 kafka headers 字段,那么这些字段值将作为 headers 添加到 kafka 消息中。
+例如,第一行将具有 headers:`source=web` 和 `traceId=trace-123`。
+字段值将被转换为字符串并用作 header 值。
+所选的字段必须是上游数据中已存在的字段。
+
+注意:
+配置为 Kafka headers 的字段将不会包含在消息的 value(payload)中,而只会存在于 Kafka 消息的 headers 中。
+
### 分区分配
假设总有五个分区,配置中的 assign_partitions 字段设置为:
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
index ca25d2144b..05fabad51c 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java
@@ -46,6 +46,14 @@ public class KafkaSinkOptions extends KafkaBaseOptions {
.withDescription(
"Configure which fields are used as the key of the
kafka message.");
+ public static final Option<List<String>> KAFKA_HEADERS_FIELDS =
+ Options.key("kafka_headers_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "Configure which fields are used as the headers of
the kafka message. "
+ + "The field value will be converted to a
string and used as the header value.");
+
public static final Option<KafkaSemantics> SEMANTICS =
Options.key("semantics")
.enumType(KafkaSemantics.class)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 9ae2e554d8..ec57ef4ce0 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -119,7 +119,7 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
timestampExtractor(),
keyExtractor(null, rowType, format, delimiter, pluginConfig),
valueExtractor(rowType, format, delimiter, pluginConfig),
- headersExtractor());
+ headersExtractor(null, rowType));
}
public static DefaultSeaTunnelRowSerializer create(
@@ -129,13 +129,24 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
MessageFormat format,
String delimiter,
ReadonlyConfig pluginConfig) {
+ return create(topic, partition, null, rowType, format, delimiter,
pluginConfig);
+ }
+
+ public static DefaultSeaTunnelRowSerializer create(
+ String topic,
+ Integer partition,
+ List<String> headerFields,
+ SeaTunnelRowType rowType,
+ MessageFormat format,
+ String delimiter,
+ ReadonlyConfig pluginConfig) {
return new DefaultSeaTunnelRowSerializer(
topicExtractor(topic, rowType, format),
partitionExtractor(partition),
timestampExtractor(),
keyExtractor(null, rowType, format, delimiter, pluginConfig),
- valueExtractor(rowType, format, delimiter, pluginConfig),
- headersExtractor());
+ valueExtractor(headerFields, rowType, format, delimiter,
pluginConfig),
+ headersExtractor(headerFields, rowType));
}
public static DefaultSeaTunnelRowSerializer create(
@@ -145,13 +156,24 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
MessageFormat format,
String delimiter,
ReadonlyConfig pluginConfig) {
+ return create(topic, keyFields, null, rowType, format, delimiter,
pluginConfig);
+ }
+
+ public static DefaultSeaTunnelRowSerializer create(
+ String topic,
+ List<String> keyFields,
+ List<String> headerFields,
+ SeaTunnelRowType rowType,
+ MessageFormat format,
+ String delimiter,
+ ReadonlyConfig pluginConfig) {
return new DefaultSeaTunnelRowSerializer(
topicExtractor(topic, rowType, format),
partitionExtractor(null),
timestampExtractor(),
keyExtractor(keyFields, rowType, format, delimiter,
pluginConfig),
- valueExtractor(rowType, format, delimiter, pluginConfig),
- headersExtractor());
+ valueExtractor(headerFields, rowType, format, delimiter,
pluginConfig),
+ headersExtractor(headerFields, rowType));
}
private static Function<SeaTunnelRow, Integer> partitionNativeExtractor(
@@ -182,6 +204,36 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
convertToKafkaHeaders((Map<String, String>)
row.getField(rowType.indexOf(HEADERS)));
}
+ private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor(
+ List<String> headerFields, SeaTunnelRowType rowType) {
+ if (headerFields == null || headerFields.isEmpty()) {
+ return row -> null;
+ }
+
+ int[] headerFieldIndexes = new int[headerFields.size()];
+ for (int i = 0; i < headerFields.size(); i++) {
+ headerFieldIndexes[i] = rowType.indexOf(headerFields.get(i));
+ }
+
+ return row -> {
+ RecordHeaders kafkaHeaders = new RecordHeaders();
+ for (int i = 0; i < headerFields.size(); i++) {
+ String headerName = headerFields.get(i);
+ Object headerValue = row.getField(headerFieldIndexes[i]);
+
+ if (headerValue == null) {
+ kafkaHeaders.add(new RecordHeader(headerName, null));
+ } else {
+ kafkaHeaders.add(
+ new RecordHeader(
+ headerName,
+
headerValue.toString().getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+ return kafkaHeaders.iterator().hasNext() ? kafkaHeaders : null;
+ };
+ }
+
private static Function<SeaTunnelRow, String> topicExtractor(
String topic, SeaTunnelRowType rowType, MessageFormat format) {
if ((MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)
@@ -256,6 +308,25 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
return row -> serializationSchema.serialize(row);
}
+ private static Function<SeaTunnelRow, byte[]> valueExtractor(
+ List<String> headerFields,
+ SeaTunnelRowType rowType,
+ MessageFormat format,
+ String delimiter,
+ ReadonlyConfig pluginConfig) {
+ if (headerFields == null || headerFields.isEmpty()) {
+ return valueExtractor(rowType, format, delimiter, pluginConfig);
+ }
+
+ // Create a new row type excluding header fields
+ SeaTunnelRowType valueRowType = createValueRowType(headerFields,
rowType);
+ Function<SeaTunnelRow, SeaTunnelRow> valueRowExtractor =
+ createValueRowExtractor(valueRowType, headerFields, rowType);
+ SerializationSchema serializationSchema =
+ createSerializationSchema(valueRowType, format, delimiter,
false, pluginConfig);
+ return row ->
serializationSchema.serialize(valueRowExtractor.apply(row));
+ }
+
private static Function<SeaTunnelRow, byte[]>
valueExtractor(SeaTunnelRowType rowType) {
return row -> (byte[]) row.getField(rowType.indexOf(VALUE));
}
@@ -273,6 +344,25 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
return new SeaTunnelRowType(keyFieldNames.toArray(new String[0]),
keyFieldDataTypeArr);
}
+ private static SeaTunnelRowType createValueRowType(
+ List<String> headerFieldNames, SeaTunnelRowType rowType) {
+ // Create a row type excluding header fields
+ List<String> valueFieldNames = new java.util.ArrayList<>();
+ List<SeaTunnelDataType> valueFieldTypes = new java.util.ArrayList<>();
+
+ for (int i = 0; i < rowType.getTotalFields(); i++) {
+ String fieldName = rowType.getFieldName(i);
+ if (!headerFieldNames.contains(fieldName)) {
+ valueFieldNames.add(fieldName);
+ valueFieldTypes.add(rowType.getFieldType(i));
+ }
+ }
+
+ return new SeaTunnelRowType(
+ valueFieldNames.toArray(new String[0]),
+ valueFieldTypes.toArray(new SeaTunnelDataType[0]));
+ }
+
private static Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor(
SeaTunnelRowType keyType, SeaTunnelRowType rowType) {
int[] keyIndex = new int[keyType.getTotalFields()];
@@ -284,7 +374,31 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
for (int i = 0; i < keyIndex.length; i++) {
fields[i] = row.getField(keyIndex[i]);
}
- return new SeaTunnelRow(fields);
+
+ SeaTunnelRow newKeyRow = new SeaTunnelRow(fields);
+ newKeyRow.setRowKind(row.getRowKind());
+ newKeyRow.setTableId(row.getTableId());
+ return newKeyRow;
+ };
+ }
+
+ private static Function<SeaTunnelRow, SeaTunnelRow>
createValueRowExtractor(
+ SeaTunnelRowType valueType, List<String> headerFieldNames,
SeaTunnelRowType rowType) {
+ int[] valueIndex = new int[valueType.getTotalFields()];
+ for (int i = 0; i < valueType.getTotalFields(); i++) {
+ valueIndex[i] = rowType.indexOf(valueType.getFieldName(i));
+ }
+ return row -> {
+ Object[] fields = new Object[valueType.getTotalFields()];
+ for (int i = 0; i < valueIndex.length; i++) {
+ fields[i] = row.getField(valueIndex[i]);
+ }
+
+ SeaTunnelRow newRow = new SeaTunnelRow(fields);
+ newRow.setRowKind(row.getRowKind());
+ newRow.setTableId(row.getTableId());
+
+ return newRow;
};
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 848b74004d..7096fde5dc 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -61,6 +61,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOp
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.KAFKA_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.KAFKA_HEADERS_FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.PARTITION;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.PARTITION_KEY_FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.SEMANTICS;
@@ -184,6 +185,12 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
MessageFormat messageFormat = pluginConfig.get(FORMAT);
String topic = pluginConfig.get(TOPIC);
if (MessageFormat.NATIVE.equals(messageFormat)) {
+ // Validate that kafka_headers_fields is not configured for NATIVE
format
+ if (pluginConfig.get(KAFKA_HEADERS_FIELDS) != null) {
+ throw new KafkaConnectorException(
+ CommonErrorCode.OPERATION_NOT_SUPPORTED,
+ "kafka_headers_fields is not supported with NATIVE
format. Please use JSON, TEXT, or other formats.");
+ }
checkNativeSeaTunnelType(seaTunnelRowType);
return DefaultSeaTunnelRowSerializer.create(topic, messageFormat,
seaTunnelRowType);
}
@@ -199,10 +206,26 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
"Cannot select both `partiton` and `partition_key_fields`.
You can configure only one of them");
}
+ // Validate that partition_key_fields and kafka_headers_fields don't
overlap
+ List<String> partitionKeyFields = getPartitionKeyFields(pluginConfig,
seaTunnelRowType);
+ List<String> headerFields = getHeaderFields(pluginConfig,
seaTunnelRowType);
+ if (!partitionKeyFields.isEmpty() && !headerFields.isEmpty()) {
+ for (String headerField : headerFields) {
+ if (partitionKeyFields.contains(headerField)) {
+ throw new KafkaConnectorException(
+ CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format(
+ "Field '%s' cannot be in both
partition_key_fields and kafka_headers_fields",
+ headerField));
+ }
+ }
+ }
+
if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
return DefaultSeaTunnelRowSerializer.create(
topic,
- getPartitionKeyFields(pluginConfig, seaTunnelRowType),
+ partitionKeyFields,
+ headerFields,
seaTunnelRowType,
messageFormat,
delimiter,
@@ -212,6 +235,7 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
return DefaultSeaTunnelRowSerializer.create(
topic,
pluginConfig.get(PARTITION),
+ headerFields,
seaTunnelRowType,
messageFormat,
delimiter,
@@ -219,7 +243,13 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
// By default, all partitions are sent randomly
return DefaultSeaTunnelRowSerializer.create(
- topic, Arrays.asList(), seaTunnelRowType, messageFormat,
delimiter, pluginConfig);
+ topic,
+ Collections.<String>emptyList(),
+ headerFields,
+ seaTunnelRowType,
+ messageFormat,
+ delimiter,
+ pluginConfig);
}
private KafkaSemantics getKafkaSemantics(ReadonlyConfig pluginConfig) {
@@ -260,6 +290,26 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
return Collections.emptyList();
}
+ private List<String> getHeaderFields(
+ ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+
+ if (pluginConfig.get(KAFKA_HEADERS_FIELDS) != null) {
+ List<String> headerFields = pluginConfig.get(KAFKA_HEADERS_FIELDS);
+ List<String> rowTypeFieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
+ for (String headerField : headerFields) {
+ if (!rowTypeFieldNames.contains(headerField)) {
+ throw new KafkaConnectorException(
+ CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format(
+ "Header field not found: %s, rowType: %s",
+ headerField, rowTypeFieldNames));
+ }
+ }
+ return headerFields;
+ }
+ return Collections.emptyList();
+ }
+
private void checkNativeSeaTunnelType(SeaTunnelRowType seaTunnelRowType) {
SeaTunnelRowType exceptRowType =
nativeTableSchema().toPhysicalRowDataType();
for (int i = 0; i < exceptRowType.getFieldTypes().length; i++) {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializerTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializerTest.java
index a60a320281..ec3575139b 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializerTest.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializerTest.java
@@ -19,17 +19,23 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
public class DefaultSeaTunnelRowSerializerTest {
@@ -65,4 +71,268 @@ public class DefaultSeaTunnelRowSerializerTest {
Assertions.assertEquals("key1", new String(record.key()));
Assertions.assertEquals("value1", new String(record.value()));
}
+
+ @Test
+ public void testKafkaHeaders() {
+ String topic = "test_topic";
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "source", "traceId"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
+ });
+ MessageFormat format = MessageFormat.JSON;
+ String delimiter = ",";
+ Map<String, Object> configMap = new HashMap<>();
+ ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+ // Test with header fields
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ topic,
+ Arrays.asList("id"),
+ Arrays.asList("source", "traceId"),
+ rowType,
+ format,
+ delimiter,
+ pluginConfig);
+
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "test", "web",
"trace-123"});
+ ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+ Assertions.assertEquals("test_topic", record.topic());
+ Assertions.assertNotNull(record.headers());
+
+ Header sourceHeader = record.headers().lastHeader("source");
+ Assertions.assertNotNull(sourceHeader);
+ Assertions.assertEquals("web", new String(sourceHeader.value(),
StandardCharsets.UTF_8));
+
+ Header traceIdHeader = record.headers().lastHeader("traceId");
+ Assertions.assertNotNull(traceIdHeader);
+ Assertions.assertEquals(
+ "trace-123", new String(traceIdHeader.value(),
StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testKafkaHeadersWithNullValue() {
+ String topic = "test_topic";
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "source", "traceId"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
+ });
+ MessageFormat format = MessageFormat.JSON;
+ String delimiter = ",";
+ Map<String, Object> configMap = new HashMap<>();
+ ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ topic,
+ Arrays.asList("id"),
+ Arrays.asList("source", "traceId"),
+ rowType,
+ format,
+ delimiter,
+ pluginConfig);
+
+ // Test with null header value
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "test", "web",
null});
+ ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+ Assertions.assertEquals("test_topic", record.topic());
+ Assertions.assertNotNull(record.headers());
+
+ Header sourceHeader = record.headers().lastHeader("source");
+ Assertions.assertNotNull(sourceHeader);
+ Assertions.assertEquals("web", new String(sourceHeader.value(),
StandardCharsets.UTF_8));
+
+ // Null value should be written as null in headers
+ Header traceIdHeader = record.headers().lastHeader("traceId");
+ Assertions.assertNotNull(traceIdHeader);
+ Assertions.assertNull(traceIdHeader.value());
+ }
+
+ @Test
+ public void testBackwardCompatibilityWithKeyFields() {
+ // Test that the 6-parameter create method (without headerFields)
still works
+ String topic = "test_topic";
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "age"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.INT_TYPE
+ });
+ MessageFormat format = MessageFormat.JSON;
+ String delimiter = ",";
+ Map<String, Object> configMap = new HashMap<>();
+ ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+ // Test with keyFields but no headerFields (backward compatibility)
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ topic, Arrays.asList("id"), rowType, format,
delimiter, pluginConfig);
+
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "John", 25});
+ ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+ Assertions.assertEquals("test_topic", record.topic());
+ Assertions.assertNotNull(record.value());
+
+ // Value should contain all fields
+ String valueString = new String(record.value(),
StandardCharsets.UTF_8);
+ Assertions.assertTrue(valueString.contains("\"id\""));
+ Assertions.assertTrue(valueString.contains("\"name\""));
+ Assertions.assertTrue(valueString.contains("\"age\""));
+ }
+
+ @Test
+ public void testBackwardCompatibilityWithPartition() {
+ // Test that the 6-parameter create method with partition (without
headerFields) still works
+ String topic = "test_topic";
+ Integer partition = 0;
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "age"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.INT_TYPE
+ });
+ MessageFormat format = MessageFormat.JSON;
+ String delimiter = ",";
+ Map<String, Object> configMap = new HashMap<>();
+ ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+ // Test with partition but no headerFields (backward compatibility)
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ topic, partition, rowType, format, delimiter,
pluginConfig);
+
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "John", 25});
+ ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+ Assertions.assertEquals("test_topic", record.topic());
+ Assertions.assertEquals(partition, record.partition());
+ Assertions.assertNotNull(record.value());
+
+ // Value should contain all fields
+ String valueString = new String(record.value(),
StandardCharsets.UTF_8);
+ Assertions.assertTrue(valueString.contains("\"id\""));
+ Assertions.assertTrue(valueString.contains("\"name\""));
+ Assertions.assertTrue(valueString.contains("\"age\""));
+ }
+
+ @Test
+ public void testHeaderFieldsExcludedFromValue() {
+ String topic = "test_topic";
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "source", "traceId"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
+ });
+ MessageFormat format = MessageFormat.JSON;
+ String delimiter = ",";
+ Map<String, Object> configMap = new HashMap<>();
+ ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+ // Test with header fields
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ topic,
+ Arrays.asList("id"),
+ Arrays.asList("source", "traceId"),
+ rowType,
+ format,
+ delimiter,
+ pluginConfig);
+
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "test", "web",
"trace-123"});
+ ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+ Assertions.assertEquals("test_topic", record.topic());
+
+ // Verify headers contain the expected fields
+ Header sourceHeader = record.headers().lastHeader("source");
+ Assertions.assertNotNull(sourceHeader);
+ Assertions.assertEquals("web", new String(sourceHeader.value(),
StandardCharsets.UTF_8));
+
+ Header traceIdHeader = record.headers().lastHeader("traceId");
+ Assertions.assertNotNull(traceIdHeader);
+ Assertions.assertEquals(
+ "trace-123", new String(traceIdHeader.value(),
StandardCharsets.UTF_8));
+
+ // Verify value does NOT contain header fields (source and traceId)
+ // Header fields are only in Kafka headers, not in the message value
+ String valueString = new String(record.value(),
StandardCharsets.UTF_8);
+ // The value should only contain id and name fields
+ Assertions.assertTrue(valueString.contains("\"id\""));
+ Assertions.assertTrue(valueString.contains("\"name\""));
+ // Header fields should NOT be in the value
+ Assertions.assertFalse(valueString.contains("\"source\""));
+ Assertions.assertFalse(valueString.contains("\"traceId\""));
+ }
+
+ @Test
+ public void testKafkaHeadersWithNullValueExcludedFromValue() {
+ // Test that null header values are written as "null" string in headers
+ // (consistent with partition_key_fields behavior)
+ // and header fields are excluded from the message value
+ String topic = "test_topic";
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "source", "traceId"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
+ });
+ MessageFormat format = MessageFormat.JSON;
+ String delimiter = ",";
+ Map<String, Object> configMap = new HashMap<>();
+ ReadonlyConfig pluginConfig = ReadonlyConfig.fromMap(configMap);
+
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ topic,
+ Arrays.asList("id"),
+ Arrays.asList("source", "traceId"),
+ rowType,
+ format,
+ delimiter,
+ pluginConfig);
+
+ // Test with null header value
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "test", "web",
null});
+ ProducerRecord<byte[], byte[]> record = serializer.serializeRow(row);
+
+ Assertions.assertEquals("test_topic", record.topic());
+ Assertions.assertNotNull(record.headers());
+
+ Header sourceHeader = record.headers().lastHeader("source");
+ Assertions.assertNotNull(sourceHeader);
+ Assertions.assertEquals("web", new String(sourceHeader.value(),
StandardCharsets.UTF_8));
+
+ // Null value should be written as null in headers
+ Header traceIdHeader = record.headers().lastHeader("traceId");
+ Assertions.assertNotNull(traceIdHeader);
+ Assertions.assertNull(traceIdHeader.value());
+
+ // Header fields should NOT be in the message value
+ String valueString = new String(record.value(),
StandardCharsets.UTF_8);
+ Assertions.assertTrue(valueString.contains("\"id\""));
+ Assertions.assertTrue(valueString.contains("\"name\""));
+ Assertions.assertFalse(valueString.contains("\"source\""));
+ Assertions.assertFalse(valueString.contains("\"traceId\""));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index ae167e31fb..7a5a188810 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -296,6 +296,46 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(10, data.size());
}
+ @TestTemplate
+ public void testSinkKafkaWithHeaders(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/kafka_sink_with_headers.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String topicName = "test_topic_headers";
+ List<ConsumerRecord<String, String>> records =
getKafkaRecordData(topicName);
+
+ Assertions.assertEquals(10, records.size());
+
+ // Verify that headers contain the expected fields (id, name)
+ for (ConsumerRecord<String, String> record : records) {
+ Map<String, String> headers =
convertHeadersToMap(record.headers());
+
+ // Verify headers contain id and name
+ Assertions.assertTrue(headers.containsKey("id"), "Header should
contain 'id' field");
+ Assertions.assertTrue(
+ headers.containsKey("name"), "Header should contain 'name'
field");
+
+ // Verify the value (payload) is a JSON object
+ ObjectMapper objectMapper = new ObjectMapper();
+ ObjectNode payloadNode = objectMapper.readValue(record.value(),
ObjectNode.class);
+
+ // Verify payload does NOT contain the header fields (id, name)
+ Assertions.assertFalse(
+ payloadNode.has("id"),
+ "Payload should NOT contain 'id' field (it's in headers)");
+ Assertions.assertFalse(
+ payloadNode.has("name"),
+ "Payload should NOT contain 'name' field (it's in
headers)");
+
+ // Verify payload contains the non-header fields (age, email,
description)
+ Assertions.assertTrue(payloadNode.has("age"), "Payload should
contain 'age' field");
+ Assertions.assertTrue(payloadNode.has("email"), "Payload should
contain 'email' field");
+ Assertions.assertTrue(
+ payloadNode.has("description"), "Payload should contain
'description' field");
+ }
+ }
+
@TestTemplate
public void testDefaultRandomSinkKafka(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_with_headers.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_with_headers.conf
new file mode 100644
index 0000000000..7a65ff3b7d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_with_headers.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ schema = {
+ fields {
+ id = int
+ name = string
+ age = int
+ email = string
+ description = string
+ }
+ }
+ }
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_headers"
+ format = json
+ kafka_headers_fields = ["id", "name"]
+ }
+}