This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 01dc67a7e83 [HUDI-9491] Add checkpoint id into hudi commit metedata 
(#13403)
01dc67a7e83 is described below

commit 01dc67a7e83b4c3274fa2da9369609b8a50bcd28
Author: Peter Huang <[email protected]>
AuthorDate: Sun Jun 8 20:14:51 2025 -0700

    [HUDI-9491] Add checkpoint id into hudi commit metedata (#13403)
---
 .../org/apache/hudi/configuration/FlinkOptions.java   |  7 +++++++
 .../hudi/sink/StreamWriteOperatorCoordinator.java     |  3 +++
 .../main/java/org/apache/hudi/util/StreamerUtil.java  | 19 +++++++++++++++++++
 .../java/org/apache/hudi/utils/TestStreamerUtil.java  | 13 +++++++++++++
 4 files changed, 42 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b8e42d9f131..13032a129ce 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -163,6 +163,13 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("The default partition name in case the dynamic 
partition"
           + " column value is null/empty string");
 
+  @AdvancedConfig
+  public static final ConfigOption<Boolean> WRITE_EXTRA_METADATA_ENABLED = 
ConfigOptions
+      .key("write.extra.metadata.enabled")
+      .booleanType()
+      .defaultValue(false) // keep sync with hoodie style
+      .withDescription("If enabled, the checkpoint Id will also be written to 
hudi metadata.");
+
   // ------------------------------------------------------------------------
   //  Changelog Capture Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e519e98dc2e..0c57a2b1d82 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -48,6 +48,7 @@ import org.apache.hudi.util.ClientIds;
 import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
@@ -549,6 +550,8 @@ public class StreamWriteOperatorCoordinator
 
     if (!hasErrors || this.conf.get(FlinkOptions.IGNORE_FAILED)) {
       HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+      StreamerUtil.addFlinkCheckpointIdIntoMetaData(conf, 
checkpointCommitMetadata, checkpointId);
+
       if (hasErrors) {
         LOG.warn("Some records failed to merge but forcing commit since 
commitOnErrors set to true. Errors/Total="
             + totalErrorRecords + "/" + totalRecords);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 7e0d62e0b4c..16f9482b06b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -86,6 +86,7 @@ import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
@@ -104,6 +105,8 @@ public class StreamerUtil {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamerUtil.class);
 
+  public static final String FLINK_CHECKPOINT_ID = "flink_checkpoint_id";
+
   public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
     TypedProperties properties = getProps(config);
     properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
config.kafkaBootstrapServers);
@@ -356,6 +359,22 @@ public class StreamerUtil {
     }
   }
 
+  /**
+   * Write Flink checkpoint id as extra metadata, if 
write.extra.metadata.enabled is true.
+   *
+   * @param conf Flink configuration
+   * @param checkpointCommitMetadata commit metadata map
+   * @param checkpointId flink checkpoint id
+   */
+  public static void addFlinkCheckpointIdIntoMetaData(
+      Configuration conf,
+      HashMap<String, String> checkpointCommitMetadata,
+      long checkpointId) {
+    if (conf.get(FlinkOptions.WRITE_EXTRA_METADATA_ENABLED)) {
+      checkpointCommitMetadata.put(FLINK_CHECKPOINT_ID, 
String.valueOf(checkpointId));
+    }
+  }
+
   /**
    * Infers the merging behavior based on what the user sets (or doesn't set).
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index 4e3ed585188..7fdf8d03087 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -121,6 +122,18 @@ public class TestStreamerUtil {
     assertThat(diff, is(75L));
   }
 
+  @Test
+  public void testAddCheckpointIdIntoMetadata() {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+
+    // Test for write extra metadata.
+    conf.set(FlinkOptions.WRITE_EXTRA_METADATA_ENABLED, true);
+
+    HashMap<String, String> metadata = new HashMap<>();
+    StreamerUtil.addFlinkCheckpointIdIntoMetaData(conf, metadata, 123L);
+    assertEquals(metadata.get(StreamerUtil.FLINK_CHECKPOINT_ID), "123");
+  }
+
   @Test
   void testTableExist() throws IOException {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());

Reply via email to