This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 01dc67a7e83 [HUDI-9491] Add checkpoint id into hudi commit metedata
(#13403)
01dc67a7e83 is described below
commit 01dc67a7e83b4c3274fa2da9369609b8a50bcd28
Author: Peter Huang <[email protected]>
AuthorDate: Sun Jun 8 20:14:51 2025 -0700
[HUDI-9491] Add checkpoint id into hudi commit metedata (#13403)
---
.../org/apache/hudi/configuration/FlinkOptions.java | 7 +++++++
.../hudi/sink/StreamWriteOperatorCoordinator.java | 3 +++
.../main/java/org/apache/hudi/util/StreamerUtil.java | 19 +++++++++++++++++++
.../java/org/apache/hudi/utils/TestStreamerUtil.java | 13 +++++++++++++
4 files changed, 42 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b8e42d9f131..13032a129ce 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -163,6 +163,13 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("The default partition name in case the dynamic
partition"
+ " column value is null/empty string");
+ @AdvancedConfig
+ public static final ConfigOption<Boolean> WRITE_EXTRA_METADATA_ENABLED =
ConfigOptions
+ .key("write.extra.metadata.enabled")
+ .booleanType()
+ .defaultValue(false) // keep sync with hoodie style
+ .withDescription("If enabled, the checkpoint Id will also be written to
hudi metadata.");
+
// ------------------------------------------------------------------------
// Changelog Capture Options
// ------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e519e98dc2e..0c57a2b1d82 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -48,6 +48,7 @@ import org.apache.hudi.util.ClientIds;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -549,6 +550,8 @@ public class StreamWriteOperatorCoordinator
if (!hasErrors || this.conf.get(FlinkOptions.IGNORE_FAILED)) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+ StreamerUtil.addFlinkCheckpointIdIntoMetaData(conf,
checkpointCommitMetadata, checkpointId);
+
if (hasErrors) {
LOG.warn("Some records failed to merge but forcing commit since
commitOnErrors set to true. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 7e0d62e0b4c..16f9482b06b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -86,6 +86,7 @@ import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Properties;
@@ -104,6 +105,8 @@ public class StreamerUtil {
private static final Logger LOG =
LoggerFactory.getLogger(StreamerUtil.class);
+ public static final String FLINK_CHECKPOINT_ID = "flink_checkpoint_id";
+
public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
TypedProperties properties = getProps(config);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
config.kafkaBootstrapServers);
@@ -356,6 +359,22 @@ public class StreamerUtil {
}
}
+ /**
+ * Write Flink checkpoint id as extra metadata, if
write.extra.metadata.enabled is true.
+ *
+ * @param conf Flink configuration
+ * @param checkpointCommitMetadata commit metadata map
+ * @param checkpointId flink checkpoint id
+ */
+ public static void addFlinkCheckpointIdIntoMetaData(
+ Configuration conf,
+ HashMap<String, String> checkpointCommitMetadata,
+ long checkpointId) {
+ if (conf.get(FlinkOptions.WRITE_EXTRA_METADATA_ENABLED)) {
+ checkpointCommitMetadata.put(FLINK_CHECKPOINT_ID,
String.valueOf(checkpointId));
+ }
+ }
+
/**
* Infers the merging behavior based on what the user sets (or doesn't set).
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index 4e3ed585188..7fdf8d03087 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -121,6 +122,18 @@ public class TestStreamerUtil {
assertThat(diff, is(75L));
}
+ @Test
+ public void testAddCheckpointIdIntoMetadata() {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+
+ // Test for write extra metadata.
+ conf.set(FlinkOptions.WRITE_EXTRA_METADATA_ENABLED, true);
+
+ HashMap<String, String> metadata = new HashMap<>();
+ StreamerUtil.addFlinkCheckpointIdIntoMetaData(conf, metadata, 123L);
+ assertEquals(metadata.get(StreamerUtil.FLINK_CHECKPOINT_ID), "123");
+ }
+
@Test
void testTableExist() throws IOException {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());