shangxinli opened a new pull request, #18127:
URL: https://github.com/apache/hudi/pull/18127
### Describe the issue this Pull Request addresses
This PR adds Kafka offset tracking to Flink Hudi commits. During each Flink
checkpoint commit, Kafka partition offsets are fetched from an Athena
checkpoint service and stored as metadata in the Hudi commit. This enables
downstream consumers to correlate Hudi commits with exact Kafka offsets for
auditing, replay, and data lineage purposes.
### Summary and Changelog
**Summary:**
Integrates Kafka offset collection into the Flink
`StreamWriteOperatorCoordinator` commit path. At each checkpoint, the system
queries an Athena checkpoint service for the Kafka partition offsets associated
with that checkpoint, then embeds them as URL-encoded metadata in the Hudi
commit. The implementation follows a "fail open" pattern -- if offset
collection fails for any reason, the commit proceeds without offsets.
**Changelog:**
- Added 12 new `@AdvancedConfig` options to `FlinkOptions.java` for
Athena/Kafka checkpoint integration (service endpoints, job metadata,
topic/cluster identifiers)
- Added 4 new constants and 4 new methods to `StreamerUtil.java`:
`addKafkaOffsetMetaData()`, `extractKafkaOffsets()`,
`collectKafkaOffsetCheckpoint()`, `stringFy()`
- Added 2 lines to `StreamWriteOperatorCoordinator.doCommit()` to invoke
offset collection at commit time
- Added `FlinkCheckpointClient` -- client for fetching checkpoint info from
Athena Ingestion Gateway
- Added `AthenaIngestionGateway` -- Muttley RPC client for the Athena
checkpoint service
- Added `FlinkHudiMuttleyClient` -- abstract HTTP client with retry logic
for Muttley RPC communication
- Added 3 exception classes for Muttley client/server errors
- Added Jackson (`jackson-databind`, `jackson-annotations`) and OkHttp
(`4.9.3`) dependencies to `hudi-flink/pom.xml`
- Added comprehensive unit tests: `TestFlinkCheckpointClient`,
`TestFlinkCheckpointClientMock`, and extended
`TestStreamWriteOperatorCoordinator` with Kafka offset test cases
**Configuration properties introduced:**
- `athena.caller.service.name` (default: `ingestion-rt`)
- `athena.service` (default: `athena-job-manager`)
- `athena.dc` -- data center for checkpoint lookup
- `athena.env` -- environment for checkpoint lookup
- `athena.job.name` -- Flink job name
- `athena.hadoop.user` -- Hadoop user
- `athena.source.kafka.cluster` -- source Kafka cluster name
- `athena.target.kafka.cluster` -- target Kafka cluster name
- `athena.topic.id` -- complete topic ID for checkpoint requests
- `athena.service.tier` (default: `DEFAULT`)
- `athena.service.name` (default: `ingestion-rt`)
- `kafka.topic.name` -- Kafka topic name for offset metadata
### Impact
**Public API Changes:**
- New public classes:
- `org.apache.hudi.sink.FlinkCheckpointClient`
- `org.apache.hudi.sink.muttley.AthenaIngestionGateway`
- `org.apache.hudi.sink.muttley.FlinkHudiMuttleyClient`
- 3 exception classes in `org.apache.hudi.sink.muttley`
- New public methods in `org.apache.hudi.util.StreamerUtil`:
- `addKafkaOffsetMetaData()`
- `collectKafkaOffsetCheckpoint()`
- `stringFy()`
- 12 new configuration options in `FlinkOptions`
**User-Facing Changes:**
When the Athena checkpoint configurations are set (`athena.dc`,
`athena.env`, `athena.job.name`, `kafka.topic.name`), Kafka partition offsets
will be automatically embedded in Hudi commit metadata under the
`HoodieMetadataKey` field. If configurations are not set, the feature is
silently skipped.
**Performance Impact:**
Minimal. One HTTP call to the Athena checkpoint service per Flink checkpoint
commit. The call is synchronous but follows a fail-open pattern -- any failure
results in a logged warning and the commit proceeds normally.
### Risk Level
**Risk Level: low**
**Justification:**
- Fail-open design: offset collection failures never block commits
- Feature is inactive unless all required Athena configurations are
explicitly set
- Only 2 lines added to the existing
`StreamWriteOperatorCoordinator.doCommit()` code path
- All new code is additive -- no existing logic is modified
- Gated by `WRITE_EXTRA_METADATA_ENABLED` configuration flag
**Verification:**
- Built successfully: `mvn clean compile -pl
hudi-flink-datasource/hudi-flink -am -DskipTests`
- Checkstyle: 0 violations
- No breaking changes to existing functionality
### Documentation Update
Configuration documentation for the 12 new Athena/Kafka properties should be
added to the Flink configuration reference. Each property has inline
`withDescription()` documentation in `FlinkOptions.java`.
### Contributor's checklist
- [x] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [x] Enough context is provided in the sections above
- [x] Adequate tests were added if applicable
- [x] Commits are signed and follow
[conventions](https://www.conventionalcommits.org/)
- [x] All existing tests pass (checkstyle: 0 violations, build: success)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]