thehkkim opened a new pull request, #10335:
URL: https://github.com/apache/seatunnel/pull/10335
### Purpose of this pull request
This PR adds support for Kafka message headers in the Kafka sink connector
through a new `kafka_headers_fields` configuration option.
Users can now specify which fields should be set as Kafka message headers,
enabling better message routing, filtering, and metadata management in Kafka
consumers without parsing the message body.
### Does this PR introduce _any_ user-facing change?
Yes. This PR introduces a new configuration option `kafka_headers_fields`
for the Kafka sink connector.
**New Feature:**
- Added `kafka_headers_fields` configuration option to specify which fields
should be added as Kafka message headers
- Fields specified as headers are automatically excluded from the message
value to avoid data duplication
**Usage Example:**
Configuration:
```hocon
sink {
kafka {
topic = "test_topic"
bootstrap.servers = "localhost:9092"
format = json
partition_key_fields = ["id"]
kafka_headers_fields = ["source", "traceId"] # New option
}
}
```
Input data:
```json
{"id": 1, "name": "test", "source": "web", "traceId": "trace-123"}
```
Output to Kafka:
- **Headers**: `source=web`, `traceId=trace-123`
- **Key**: `{"id": 1}`
- **Value**: `{"id": 1, "name": "test"}` (header fields excluded)
**Benefits:**
- Enables header-based routing and filtering in Kafka consumers
- Supports distributed tracing by passing trace IDs via headers
- Allows metadata tagging without parsing message body
- Follows standard Kafka pattern where headers contain metadata
**Backward Compatibility:**
- Fully backward compatible - new optional configuration
- Existing configurations continue to work without any changes
### How was this patch tested?
**1. Unit Tests:**
Added comprehensive test coverage in `DefaultSeaTunnelRowSerializerTest`:
- `testKafkaHeaders()` - Verifies fields are correctly added to Kafka
message headers
- `testKafkaHeadersWithNullValue()` - Ensures null values are properly
handled (not added to headers)
- `testHeaderFieldsExcludedFromValue()` - Confirms header fields are
excluded from message value
**2. Test Results:**
```
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0
```
All tests pass successfully, verifying:
- ✅ Headers are correctly populated with specified field values
- ✅ Header fields are automatically excluded from the serialized value
- ✅ Null values are handled properly (skipped in headers)
- ✅ Multiple header fields work correctly
- ✅ Field values are converted to strings as header values
**3. Test Coverage:**
- Tested with JSON format (primary use case)
- Verified serialization logic for both headers and values
- Confirmed backward compatibility when `kafka_headers_fields` is not
configured
**4. Code Quality:**
- ✅ Passed Apache SeaTunnel Spotless formatting checks
- ✅ Follows existing code patterns in the Kafka connector
- ✅ Consistent with `partition_key_fields` implementation pattern
### Check list
* [x] Code changes are covered with comprehensive unit tests
* [x] Documentation updated to describe the new feature
- Updated `docs/en/connectors/sink/Kafka.md` with feature description and
usage example
- Updated `docs/zh/connectors/sink/Kafka.md` with Chinese translation
* [x] Added configuration examples in documentation
* [ ] No new Jar binary packages added
* [ ] Not a new connector (enhancement to existing Kafka connector)
* [x] Code follows Apache SeaTunnel coding standards and passes all checks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]