This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c586d96e2d0 [HUDI-6603] Relax the restriction for Flink MDT rollback
(#9306)
c586d96e2d0 is described below
commit c586d96e2d09d4942c79a4e1ae74426ac096a005
Author: Danny Chan <[email protected]>
AuthorDate: Fri Jul 28 19:50:09 2023 +0800
[HUDI-6603] Relax the restriction for Flink MDT rollback (#9306)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 27 ++++++++++++++--------
.../FlinkHoodieBackedTableMetadataWriter.java | 10 ++++++++
2 files changed, 27 insertions(+), 10 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 7055ab993cd..83e6d8de6ea 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -999,16 +999,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
// The deltacommit that will be rolled back
HoodieInstant deltaCommitInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
- // The commit being rolled back should not be earlier than the latest
compaction on the MDT. Compaction on MDT only occurs when all actions
- // are completed on the dataset. Hence, this case implies a rollback of
completed commit which should actually be handled using restore.
- if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
- final String compactionInstantTime = compactionInstant.getTimestamp();
- if
(HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime,
compactionInstantTime)) {
- throw new HoodieMetadataException(String.format("Commit being rolled
back %s is earlier than the latest compaction %s. "
- + "There are %d deltacommits after this compaction: %s",
commitToRollbackInstantTime, compactionInstantTime,
- deltacommitsSinceCompaction.countInstants(),
deltacommitsSinceCompaction.getInstants()));
- }
- }
+ validateRollback(commitToRollbackInstantTime, compactionInstant,
deltacommitsSinceCompaction);
// lets apply a delta commit with DT's rb instant(with special suffix)
containing following records:
// a. any log files as part of RB commit metadata that was added
@@ -1031,6 +1022,22 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
}
}
+ protected void validateRollback(
+ String commitToRollbackInstantTime,
+ HoodieInstant compactionInstant,
+ HoodieTimeline deltacommitsSinceCompaction) {
+ // The commit being rolled back should not be earlier than the latest
compaction on the MDT. Compaction on MDT only occurs when all actions
+ // are completed on the dataset. Hence, this case implies a rollback of
completed commit which should actually be handled using restore.
+ if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+ final String compactionInstantTime = compactionInstant.getTimestamp();
+ if
(HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime,
compactionInstantTime)) {
+ throw new HoodieMetadataException(String.format("Commit being rolled
back %s is earlier than the latest compaction %s. "
+ + "There are %d deltacommits after this compaction: %s",
commitToRollbackInstantTime, compactionInstantTime,
+ deltacommitsSinceCompaction.countInstants(),
deltacommitsSinceCompaction.getInstants()));
+ }
+ }
+ }
+
@Override
public void close() throws Exception {
if (metadata != null) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index e3602ce1a2a..cc55a8f5e8e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -172,6 +173,15 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
return true;
}
+ @Override
+ protected void validateRollback(String commitToRollbackInstantTime,
HoodieInstant compactionInstant, HoodieTimeline deltacommitsSinceCompaction) {
+ // ignore, flink has more radical compression strategy, it is very
probably that
+ // the latest compaction instant has greater timestamp than the instant to
roll back.
+
+ // The limitation can be relaxed because the log reader of MDT only
accepts valid instants
+ // based on the DT timeline, so the base file of MDT does not include
un-committed instants.
+ }
+
@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType>
partitions) {
throw new HoodieNotSupportedException("Dropping metadata index not
supported for Flink metadata table yet.");