shangxinli opened a new pull request, #18127:
URL: https://github.com/apache/hudi/pull/18127

   ### Describe the issue this Pull Request addresses
   
   This PR adds Kafka offset tracking to Flink Hudi commits. During each Flink 
checkpoint commit, Kafka partition offsets are fetched from an Athena 
checkpoint service and stored as metadata in the Hudi commit. This enables 
downstream consumers to correlate Hudi commits with exact Kafka offsets for 
auditing, replay, and data lineage purposes.
   
   ### Summary and Changelog
   
   **Summary:**
   Integrates Kafka offset collection into the Flink 
`StreamWriteOperatorCoordinator` commit path. At each checkpoint, the system 
queries an Athena checkpoint service for the Kafka partition offsets associated 
with that checkpoint, then embeds them as URL-encoded metadata in the Hudi 
commit. The implementation follows a "fail open" pattern -- if offset 
collection fails for any reason, the commit proceeds without offsets.
   
   **Changelog:**
   - Added 12 new `@AdvancedConfig` options to `FlinkOptions.java` for 
Athena/Kafka checkpoint integration (service endpoints, job metadata, 
topic/cluster identifiers)
   - Added 4 new constants and 4 new methods to `StreamerUtil.java`: 
`addKafkaOffsetMetaData()`, `extractKafkaOffsets()`, 
`collectKafkaOffsetCheckpoint()`, `stringFy()`
   - Added 2 lines to `StreamWriteOperatorCoordinator.doCommit()` to invoke 
offset collection at commit time
   - Added `FlinkCheckpointClient` -- client for fetching checkpoint info from 
Athena Ingestion Gateway
   - Added `AthenaIngestionGateway` -- Muttley RPC client for the Athena 
checkpoint service
   - Added `FlinkHudiMuttleyClient` -- abstract HTTP client with retry logic 
for Muttley RPC communication
   - Added 3 exception classes for Muttley client/server errors
   - Added Jackson (`jackson-databind`, `jackson-annotations`) and OkHttp 
(`4.9.3`) dependencies to `hudi-flink/pom.xml`
   - Added comprehensive unit tests: `TestFlinkCheckpointClient`, 
`TestFlinkCheckpointClientMock`, and extended 
`TestStreamWriteOperatorCoordinator` with Kafka offset test cases
   
   **Configuration properties introduced:**
   - `athena.caller.service.name` (default: `ingestion-rt`)
   - `athena.service` (default: `athena-job-manager`)
   - `athena.dc` -- data center for checkpoint lookup
   - `athena.env` -- environment for checkpoint lookup
   - `athena.job.name` -- Flink job name
   - `athena.hadoop.user` -- Hadoop user
   - `athena.source.kafka.cluster` -- source Kafka cluster name
   - `athena.target.kafka.cluster` -- target Kafka cluster name
   - `athena.topic.id` -- complete topic ID for checkpoint requests
   - `athena.service.tier` (default: `DEFAULT`)
   - `athena.service.name` (default: `ingestion-rt`)
   - `kafka.topic.name` -- Kafka topic name for offset metadata
   
   ### Impact
   
   **Public API Changes:**
   - New public classes:
     - `org.apache.hudi.sink.FlinkCheckpointClient`
     - `org.apache.hudi.sink.muttley.AthenaIngestionGateway`
     - `org.apache.hudi.sink.muttley.FlinkHudiMuttleyClient`
     - 3 exception classes in `org.apache.hudi.sink.muttley`
   - New public methods in `org.apache.hudi.util.StreamerUtil`:
     - `addKafkaOffsetMetaData()`
     - `collectKafkaOffsetCheckpoint()`
     - `stringFy()`
   - 12 new configuration options in `FlinkOptions`
   
   **User-Facing Changes:**
   When the Athena checkpoint configurations are set (`athena.dc`, 
`athena.env`, `athena.job.name`, `kafka.topic.name`), Kafka partition offsets 
will be automatically embedded in Hudi commit metadata under the 
`HoodieMetadataKey` field. If configurations are not set, the feature is 
silently skipped.
   
   **Performance Impact:**
   Minimal. One HTTP call to the Athena checkpoint service per Flink checkpoint 
commit. The call is synchronous but follows a fail-open pattern -- any failure 
results in a logged warning and the commit proceeds normally.
   
   ### Risk Level
   
   **Risk Level: low**
   
   **Justification:**
   - Fail-open design: offset collection failures never block commits
   - Feature is inactive unless all required Athena configurations are 
explicitly set
   - Only 2 lines added to the existing 
`StreamWriteOperatorCoordinator.doCommit()` code path
   - All new code is additive -- no existing logic is modified
   - Gated by `WRITE_EXTRA_METADATA_ENABLED` configuration flag
   
   **Verification:**
   - Built successfully: `mvn clean compile -pl 
hudi-flink-datasource/hudi-flink -am -DskipTests`
   - Checkstyle: 0 violations
   - No breaking changes to existing functionality
   
   ### Documentation Update
   
   Configuration documentation for the 12 new Athena/Kafka properties should be 
added to the Flink configuration reference. Each property has inline 
`withDescription()` documentation in `FlinkOptions.java`.
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Enough context is provided in the sections above
   - [x] Adequate tests were added if applicable
   - [x] Commits are signed and follow 
[conventions](https://www.conventionalcommits.org/)
   - [x] All existing tests pass (checkstyle: 0 violations, build: success)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to