suryaprasanna commented on code in PR #18421:
URL: https://github.com/apache/hudi/pull/18421#discussion_r3012654327
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -753,6 +753,28 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Whether to allow generation of empty commits, even
if no data was written in the commit. "
+ "It's useful in cases where extra metadata needs to be published
regardless e.g tracking source offsets when ingesting data");
+ public static final ConfigProperty<String> ROLLING_METADATA_KEYS =
ConfigProperty
+ .key("hoodie.write.rolling.metadata.keys")
+ .defaultValue("")
+ .markAdvanced()
+ .sinceVersion("1.1.0")
Review Comment:
Should this be 1.2.0?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -429,6 +429,128 @@ protected void preCommit(HoodieCommitMetadata metadata) {
// Important to create this after the lock to ensure the latest commits
show up in the timeline without need for reload
HoodieTable table = createTable(config);
resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+
+ // Merge rolling metadata after conflict resolution, still within the lock
+ mergeRollingMetadata(table, metadata);
+ }
+
+ /**
+ * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * This method MUST be called within the transaction lock after conflict
resolution.
+ *
+ * <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+ * automatically carried forward from recent commits. The system walks back
up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
+ * recent value for each key. This ensures that important metadata like
checkpoint information
+ * remains accessible without worrying about archival or missing keys in
individual commits.
+ *
+ * @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
+ * @param metadata Current commit metadata to be augmented with rolling
metadata
+ */
+ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // Skip for metadata table - rolling metadata is only for data tables
+ if (table.isMetadataTable()) {
+ return;
+ }
+
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return; // No rolling metadata configured
+ }
+
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+ if (commitsTimeline.empty()) {
+ log.info("No previous commits found. Rolling metadata will start with
current commit.");
+ return; // First commit - nothing to roll forward
+ }
+
+ try {
+ Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remainingKeys = new java.util.HashSet<>(rollingKeys);
Review Comment:
Here as well, import the java.util.HashSet?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -2874,6 +2892,21 @@ public boolean allowOperationMetadataField() {
return getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD);
}
+ public java.util.Set<String> getRollingMetadataKeys() {
Review Comment:
Can you add the imports for following classes?
java.util.Set, java.util.Collections, java.util.Arrays
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -753,6 +753,28 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Whether to allow generation of empty commits, even
if no data was written in the commit. "
+ "It's useful in cases where extra metadata needs to be published
regardless e.g tracking source offsets when ingesting data");
+ public static final ConfigProperty<String> ROLLING_METADATA_KEYS =
ConfigProperty
+ .key("hoodie.write.rolling.metadata.keys")
+ .defaultValue("")
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Comma-separated list of extra metadata keys that
should be automatically carried forward "
+ + "to every new commit. These keys will be read from recent commit
metadata and included in new commits, "
+ + "ensuring they remain accessible without walking the timeline or
worrying about archival. "
+ + "This is useful for tracking checkpoint information (e.g., Kafka
offsets, Flink checkpoints) or any metadata "
+ + "that needs to persist across commits. New values override old
ones. Only applies to data table commits.");
+
+ public static final ConfigProperty<Integer>
ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS = ConfigProperty
+ .key("hoodie.write.rolling.metadata.timeline.lookback.commits")
+ .defaultValue(10)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
Review Comment:
Should this be 1.2.0?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -429,6 +429,128 @@ protected void preCommit(HoodieCommitMetadata metadata) {
// Important to create this after the lock to ensure the latest commits
show up in the timeline without need for reload
HoodieTable table = createTable(config);
resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+
+ // Merge rolling metadata after conflict resolution, still within the lock
+ mergeRollingMetadata(table, metadata);
+ }
+
+ /**
+ * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * This method MUST be called within the transaction lock after conflict
resolution.
+ *
+ * <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+ * automatically carried forward from recent commits. The system walks back
up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
+ * recent value for each key. This ensures that important metadata like
checkpoint information
+ * remains accessible without worrying about archival or missing keys in
individual commits.
+ *
+ * @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
+ * @param metadata Current commit metadata to be augmented with rolling
metadata
+ */
+ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // Skip for metadata table - rolling metadata is only for data tables
+ if (table.isMetadataTable()) {
+ return;
+ }
+
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return; // No rolling metadata configured
+ }
+
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+ if (commitsTimeline.empty()) {
+ log.info("No previous commits found. Rolling metadata will start with
current commit.");
+ return; // First commit - nothing to roll forward
+ }
+
+ try {
+ Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remainingKeys = new java.util.HashSet<>(rollingKeys);
+
+ // Remove keys that are already present in current commit (current
values take precedence)
+ for (String key : rollingKeys) {
+ if (existingExtraMetadata.containsKey(key)) {
+ remainingKeys.remove(key);
+ }
+ }
+
+ if (remainingKeys.isEmpty()) {
+ log.debug("All rolling metadata keys are present in current commit. No
walkback needed.");
+ return;
+ }
+
+ int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+ int commitsWalkedBack = 0;
+
+ // Walk back through the timeline in reverse order (most recent first)
to find values for all remaining keys
+ List<HoodieInstant> recentCommits = commitsTimeline.getInstantsAsStream()
+ .collect(java.util.stream.Collectors.toList());
+
+ // Reverse to walk back from most recent to oldest
+ java.util.Collections.reverse(recentCommits);
Review Comment:
Here, how are we enforcing that the instants are ordered based on completion
time?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -429,6 +429,128 @@ protected void preCommit(HoodieCommitMetadata metadata) {
// Important to create this after the lock to ensure the latest commits
show up in the timeline without need for reload
HoodieTable table = createTable(config);
resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+
+ // Merge rolling metadata after conflict resolution, still within the lock
+ mergeRollingMetadata(table, metadata);
+ }
+
+ /**
+ * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * This method MUST be called within the transaction lock after conflict
resolution.
+ *
+ * <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+ * automatically carried forward from recent commits. The system walks back
up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
+ * recent value for each key. This ensures that important metadata like
checkpoint information
+ * remains accessible without worrying about archival or missing keys in
individual commits.
+ *
+ * @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
+ * @param metadata Current commit metadata to be augmented with rolling
metadata
+ */
+ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // Skip for metadata table - rolling metadata is only for data tables
+ if (table.isMetadataTable()) {
+ return;
+ }
+
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return; // No rolling metadata configured
+ }
+
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+ if (commitsTimeline.empty()) {
+ log.info("No previous commits found. Rolling metadata will start with
current commit.");
+ return; // First commit - nothing to roll forward
+ }
+
+ try {
+ Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remainingKeys = new java.util.HashSet<>(rollingKeys);
+
+ // Remove keys that are already present in current commit (current
values take precedence)
+ for (String key : rollingKeys) {
+ if (existingExtraMetadata.containsKey(key)) {
+ remainingKeys.remove(key);
+ }
+ }
+
+ if (remainingKeys.isEmpty()) {
+ log.debug("All rolling metadata keys are present in current commit. No
walkback needed.");
+ return;
+ }
+
+ int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+ int commitsWalkedBack = 0;
+
+ // Walk back through the timeline in reverse order (most recent first)
to find values for all remaining keys
+ List<HoodieInstant> recentCommits = commitsTimeline.getInstantsAsStream()
+ .collect(java.util.stream.Collectors.toList());
+
+ // Reverse to walk back from most recent to oldest
+ java.util.Collections.reverse(recentCommits);
+
+ // Limit to lookback commits
+ recentCommits = recentCommits.stream()
+ .limit(lookbackLimit)
+ .collect(java.util.stream.Collectors.toList());
Review Comment:
Here as well, import the java.util.stream.Collectors?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]