shangxinli commented on code in PR #18127:
URL: https://github.com/apache/hudi/pull/18127#discussion_r2881003864
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -729,4 +740,225 @@ public static void validateWriteStatus(
});
}
}
+
+ /*
+ * Add Kafka offset metadata to the checkpoint metadata.
+ * Uses Flink-specific checkpoint key but same format as Spark for
compatibility.
+ *
+ * @param conf Flink configuration
+ * @param checkpointCommitMetadata commit metadata map
+ * @param kafkaOffsetCheckpoint Kafka offset checkpoint string in Spark
format: "HoodieMetadataKey" :
"kafka_metadata%3Ahp-event-web%3A101:7583675434;kafka_metadata%3Ahp-event-web%3A222:7190059945;"
+ */
+ public static void addKafkaOffsetMetaData(
+ Configuration conf,
+ HashMap<String, String> checkpointCommitMetadata,
+ String kafkaOffsetCheckpoint) {
+ if (conf.get(FlinkOptions.WRITE_EXTRA_METADATA_ENABLED) &&
kafkaOffsetCheckpoint != null) {
Review Comment:
Good point — restructured `addKafkaOffsetMetaData` to accept `checkpointId`
and `checkpointClient` directly. It now checks `WRITE_EXTRA_METADATA_ENABLED`
first and only calls `collectKafkaOffsetCheckpoint` when enabled, so the
checkpoint service access is fully lazy.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -1144,6 +1144,94 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(30)
.withDescription("Maximum number of groups to create as part of
ClusteringPlan. Increasing groups will increase parallelism, default is 30");
+ // ------------------------------------------------------------------------
+ // Kafka/Athena Checkpoint Options
+ // ------------------------------------------------------------------------
+
+ @AdvancedConfig
+ public static final ConfigOption<String> CALLER_SERVICE_NAME = ConfigOptions
+ .key("athena.caller.service.name")
Review Comment:
Good suggestion — renamed all config keys from `athena.*` to
`kafka.offset.trace.*` prefix. This is vendor-neutral and clearly groups all
offset tracking configs together. Updated descriptions to remove
Athena-specific references as well.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -729,4 +740,225 @@ public static void validateWriteStatus(
});
}
}
+
+ /*
+ * Add Kafka offset metadata to the checkpoint metadata.
+ * Uses Flink-specific checkpoint key but same format as Spark for
compatibility.
+ *
+ * @param conf Flink configuration
+ * @param checkpointCommitMetadata commit metadata map
+ * @param kafkaOffsetCheckpoint Kafka offset checkpoint string in Spark
format: "HoodieMetadataKey" :
"kafka_metadata%3Ahp-event-web%3A101:7583675434;kafka_metadata%3Ahp-event-web%3A222:7190059945;"
+ */
+ public static void addKafkaOffsetMetaData(
+ Configuration conf,
+ HashMap<String, String> checkpointCommitMetadata,
+ String kafkaOffsetCheckpoint) {
+ if (conf.get(FlinkOptions.WRITE_EXTRA_METADATA_ENABLED) &&
kafkaOffsetCheckpoint != null) {
+ checkpointCommitMetadata.put(HOODIE_METADATA_KEY, kafkaOffsetCheckpoint);
+ }
+ }
+
+ /**
+ * Extracts Kafka offsets from checkpoint response.
+ *
+ * @param checkpointInfo The checkpoint info from service
+ * @param checkpointId The checkpoint ID for logging
+ * @return Map of partition ID to offset, or null if no valid offsets found
+ */
+ private static Map<Integer, Long> extractKafkaOffsets(
+ AthenaIngestionGateway.CheckpointKafkaOffsetInfo checkpointInfo,
+ long checkpointId) {
+
+ List<AthenaIngestionGateway.CheckpointKafkaOffsetInfo.KafkaOffsetsInfo>
kafkaOffsetsInfoList =
+ checkpointInfo.getKafkaOffsetsInfo();
+
+ if (kafkaOffsetsInfoList == null || kafkaOffsetsInfoList.isEmpty()) {
+ log.warn("No Kafka offset information found in checkpoint response");
+ return null;
+ }
+
+ // Verify we have exactly one topic
+ if (kafkaOffsetsInfoList.size() > 1) {
+ throw new IllegalStateException(
+ String.format("Expected exactly one topic in checkpoint response,
but found %d topics",
+ kafkaOffsetsInfoList.size()));
+ }
+
+ // Get the single topic's offsets
+ AthenaIngestionGateway.CheckpointKafkaOffsetInfo.KafkaOffsetsInfo
kafkaOffsetInfo =
+ kafkaOffsetsInfoList.get(0);
+
+ // Validate offset info structure
+ if (kafkaOffsetInfo.getOffsets() == null ||
kafkaOffsetInfo.getOffsets().getOffsets() == null) {
+ log.warn("Kafka offset info has null offsets for checkpointId={}",
checkpointId);
+ return null;
+ }
+
+ Map<Integer, Long> partitionOffsets =
kafkaOffsetInfo.getOffsets().getOffsets();
+
+ // Validate partition offsets
+ if (partitionOffsets.isEmpty()) {
+ log.warn("No partition offsets found for checkpointId={}", checkpointId);
+ return null;
+ }
+
+ // Validate offset values
+ for (Map.Entry<Integer, Long> entry : partitionOffsets.entrySet()) {
+ if (entry.getKey() < 0 || entry.getValue() < 0) {
+ log.error("Invalid partition ID {} for checkpointId={} or offset for
offset={}",
+ entry.getKey(), checkpointId, entry.getValue());
+ return null;
+ }
+ }
+
+ return partitionOffsets;
+ }
+
+ /**
+ * Collects Kafka offset checkpoint from checkpoint service.
+ *
+ * <p>Important assumptions:
+ * <ul>
+ * <li>Each Flink job processes only ONE Kafka topic</li>
+ * <li>Each checkpoint ID is associated with exactly ONE topic</li>
+ * <li>Each topic can have multiple partitions</li>
+ * </ul>
+ *
+ * <p>Fails open - if any error occurs, returns null and allows commit to
proceed without Kafka offsets.
+ *
+ * @param conf The Flink configuration
+ * @param checkpointId The checkpoint ID
+ * @param checkpointClient The checkpoint client (nullable)
+ * @return Kafka offset checkpoint string in URL-encoded format for Hudi
metadata,
+ * e.g.,
"kafka_metadata%3Atopic-name%3A0:100;kafka_metadata%3Atopic-name%3A1:200"
+ * where format is "kafka_metadata%3Atopic%3Apartition:offset" separated by
semicolons.
+ * Returns null if not available or on error
+ * @throws IllegalStateException if more than one topic is found in the
checkpoint response
+ */
+ public static String collectKafkaOffsetCheckpoint(Configuration conf, long
checkpointId,
+ FlinkCheckpointClient
checkpointClient) {
+ // Extract topic and cluster names early
+ String topicName = conf.contains(FlinkOptions.KAFKA_TOPIC_NAME)
+ ? conf.get(FlinkOptions.KAFKA_TOPIC_NAME) : null;
+ String clusterName = conf.contains(FlinkOptions.SOURCE_KAFKA_CLUSTER)
+ ? conf.get(FlinkOptions.SOURCE_KAFKA_CLUSTER) : null;
+ Map<Integer, Long> partitionOffsets = null;
+
+ try {
+ // Validate checkpoint ID
+ if (checkpointId < 0) {
+ log.warn("Invalid checkpointId={}, must be non-negative",
checkpointId);
+ return stringFy(topicName, clusterName, partitionOffsets);
+ }
+
+ // Check if required configurations are present
+ if (!conf.contains(FlinkOptions.DC) || !conf.contains(FlinkOptions.ENV)
Review Comment:
Good catch — expanded the upfront validation to include all required fields:
`TOPIC_ID`, `HADOOP_USER`, and `SOURCE_KAFKA_CLUSTER` in addition to `DC`,
`ENV`, `JOB_NAME`, and `KAFKA_TOPIC_NAME`. The log message now lists all
required configs so users can easily see what is missing.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -1144,6 +1144,94 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(30)
.withDescription("Maximum number of groups to create as part of
ClusteringPlan. Increasing groups will increase parallelism, default is 30");
+ // ------------------------------------------------------------------------
+ // Kafka/Athena Checkpoint Options
+ // ------------------------------------------------------------------------
+
+ @AdvancedConfig
+ public static final ConfigOption<String> CALLER_SERVICE_NAME = ConfigOptions
+ .key("athena.caller.service.name")
+ .stringType()
+ .defaultValue("ingestion-rt")
+ .withDescription("Caller service name for Athena RPC headers");
+
+ @AdvancedConfig
Review Comment:
Valid concern about portability. The config keys have been renamed from
`athena.*` to `kafka.offset.trace.*` to remove vendor-specific naming. For the
pluggable `KafkaOffsetProvider` abstraction — I agree this is the right
direction for OSS. I would prefer to tackle it as a follow-up PR since it is a
larger design change (interface + factory + default no-op implementation +
moving the Athena/Muttley logic into an optional provider). The current PR
keeps the scope focused on getting the offset tracking data flowing end-to-end.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]