This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c586d96e2d0 [HUDI-6603] Relax the restriction for Flink MDT rollback 
(#9306)
c586d96e2d0 is described below

commit c586d96e2d09d4942c79a4e1ae74426ac096a005
Author: Danny Chan <[email protected]>
AuthorDate: Fri Jul 28 19:50:09 2023 +0800

    [HUDI-6603] Relax the restriction for Flink MDT rollback (#9306)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 27 ++++++++++++++--------
 .../FlinkHoodieBackedTableMetadataWriter.java      | 10 ++++++++
 2 files changed, 27 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 7055ab993cd..83e6d8de6ea 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -999,16 +999,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       // The deltacommit that will be rolled back
       HoodieInstant deltaCommitInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
 
-      // The commit being rolled back should not be earlier than the latest 
compaction on the MDT. Compaction on MDT only occurs when all actions
-      // are completed on the dataset. Hence, this case implies a rollback of 
completed commit which should actually be handled using restore.
-      if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
-        final String compactionInstantTime = compactionInstant.getTimestamp();
-        if 
(HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, 
compactionInstantTime)) {
-          throw new HoodieMetadataException(String.format("Commit being rolled 
back %s is earlier than the latest compaction %s. "
-                  + "There are %d deltacommits after this compaction: %s", 
commitToRollbackInstantTime, compactionInstantTime,
-              deltacommitsSinceCompaction.countInstants(), 
deltacommitsSinceCompaction.getInstants()));
-        }
-      }
+      validateRollback(commitToRollbackInstantTime, compactionInstant, 
deltacommitsSinceCompaction);
 
       // lets apply a delta commit with DT's rb instant(with special suffix) 
containing following records:
       // a. any log files as part of RB commit metadata that was added
@@ -1031,6 +1022,22 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     }
   }
 
+  protected void validateRollback(
+      String commitToRollbackInstantTime,
+      HoodieInstant compactionInstant,
+      HoodieTimeline deltacommitsSinceCompaction) {
+    // The commit being rolled back should not be earlier than the latest 
compaction on the MDT. Compaction on MDT only occurs when all actions
+    // are completed on the dataset. Hence, this case implies a rollback of 
completed commit which should actually be handled using restore.
+    if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+      final String compactionInstantTime = compactionInstant.getTimestamp();
+      if 
(HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, 
compactionInstantTime)) {
+        throw new HoodieMetadataException(String.format("Commit being rolled 
back %s is earlier than the latest compaction %s. "
+                + "There are %d deltacommits after this compaction: %s", 
commitToRollbackInstantTime, compactionInstantTime,
+            deltacommitsSinceCompaction.countInstants(), 
deltacommitsSinceCompaction.getInstants()));
+      }
+    }
+  }
+
   @Override
   public void close() throws Exception {
     if (metadata != null) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index e3602ce1a2a..cc55a8f5e8e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -172,6 +173,15 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     return true;
   }
 
+  @Override
+  protected void validateRollback(String commitToRollbackInstantTime, 
HoodieInstant compactionInstant, HoodieTimeline deltacommitsSinceCompaction) {
+    // ignore, flink has more radical compression strategy, it is very 
probably that
+    // the latest compaction instant has greater timestamp than the instant to 
roll back.
+
+    // The limitation can be relaxed because the log reader of MDT only 
accepts valid instants
+    // based on the DT timeline, so the base file of MDT does not include 
un-committed instants.
+  }
+
   @Override
   public void deletePartitions(String instantTime, List<MetadataPartitionType> 
partitions) {
     throw new HoodieNotSupportedException("Dropping metadata index not 
supported for Flink metadata table yet.");

Reply via email to