This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 35aba7cb430 [HUDI-5520] Limit MDT deltacommits when data table has 
pending action (#8772)
35aba7cb430 is described below

commit 35aba7cb430a09ba2d7013b25405c88dfe88f588
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun May 21 20:35:23 2023 +0800

    [HUDI-5520] Limit MDT deltacommits when data table has pending action 
(#8772)
    
    ---------
    Co-authored-by: Jonathan Vexler <=>
---
 .../metadata/HoodieBackedTableMetadataWriter.java   | 21 ++++++++++++++++++++-
 .../client/functional/TestHoodieBackedMetadata.java | 21 +++++++++++++++++++++
 .../hudi/common/config/HoodieMetadataConfig.java    | 17 +++++++++++++++++
 3 files changed, 58 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index fb19ce1e02d..cfbb750939d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -51,6 +51,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -73,6 +74,7 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.hadoop.SerializablePath;
 import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
+
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -97,12 +99,13 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ASYNC_CLEAN;
+import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
 import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
@@ -822,6 +825,21 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
         });
   }
 
+  private static void checkNumDeltaCommits(HoodieTableMetaClient metaClient, 
int maxNumDeltaCommitsWhenPending) {
+    final HoodieActiveTimeline activeTimeline = 
metaClient.reloadActiveTimeline();
+    Option<HoodieInstant> lastCompaction = 
activeTimeline.filterCompletedInstants()
+        .filter(s -> s.getAction().equals(COMPACTION_ACTION)).lastInstant();
+    int numDeltaCommits = lastCompaction.isPresent()
+        ? 
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants()
+        : activeTimeline.getDeltaCommitTimeline().countInstants();
+    if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
+      throw new HoodieMetadataException(String.format("Metadata table's 
deltacommits exceeded %d: "
+              + "this is likely caused by a pending instant in the data table. 
Resolve the pending instant "
+              + "or adjust `%s`, then restart the pipeline.",
+          maxNumDeltaCommitsWhenPending, 
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
+    }
+  }
+
   private MetadataRecordsGenerationParams getRecordsGenerationParams() {
     return new MetadataRecordsGenerationParams(
         dataMetaClient,
@@ -1081,6 +1099,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
         
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
 
     if (!pendingInstants.isEmpty()) {
+      checkNumDeltaCommits(metadataMetaClient, 
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
       LOG.info(String.format(
           "Cannot compact metadata table as there are %d inflight instants in 
data table before latest deltacommit in metadata table: %s. Inflight instants 
in data table: %s",
           pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, 
Arrays.toString(pendingInstants.toArray())));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 1208f85e602..10b134887c4 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2437,6 +2437,27 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  @Test
+  public void testMetadataTableWithLongLog() throws Exception {
+    init(COPY_ON_WRITE, false);
+    final int maxNumDeltacommits = 3;
+    writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .enableMetrics(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltacommits + 100)
+            .withMaxNumDeltacommitsWhenPending(maxNumDeltacommits)
+            .build()).build();
+    initWriteConfigAndMetatableWriter(writeConfig, true);
+    testTable.addRequestedCommit(String.format("%016d", 0));
+    for (int i = 1; i <= maxNumDeltacommits; i++) {
+      doWriteOperation(testTable, String.format("%016d", i));
+    }
+    int instant = maxNumDeltacommits + 1;
+    Throwable t = assertThrows(HoodieMetadataException.class, () -> 
doWriteOperation(testTable, String.format("%016d", instant)));
+    assertTrue(t.getMessage().startsWith(String.format("Metadata table's 
deltacommits exceeded %d: ", maxNumDeltacommits)));
+  }
+
   @Test
   public void testNonPartitioned() throws Exception {
     init(HoodieTableType.COPY_ON_WRITE, false);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 42d4aed2f24..6f9615578fa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -226,6 +226,14 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("Optimized log blocks scanner that addresses all the 
multi-writer use-cases while appending to log files. "
           + "It also differentiates original blocks written by ingestion 
writers and compacted blocks written by log compaction.");
 
+  public static final ConfigProperty<Integer> 
METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING = ConfigProperty
+      .key(METADATA_PREFIX + ".max.deltacommits.when_pending")
+      .defaultValue(1000)
+      .markAdvanced()
+      .sinceVersion("0.14.0")
+      .withDocumentation("When there is a pending instant in data table, this 
config limits the allowed number of deltacommits in metadata table to "
+          + "prevent the metadata table's timeline from growing unboundedly as 
compaction won't be triggered due to the pending data table instant.");
+
   private HoodieMetadataConfig() {
     super();
   }
@@ -306,6 +314,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
   }
 
+  public int getMaxNumDeltacommitsWhenPending() {
+    return getIntOrDefault(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING);
+  }
+
   /**
    * Builder for {@link HoodieMetadataConfig}.
    */
@@ -431,6 +443,11 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withMaxNumDeltacommitsWhenPending(int 
maxNumDeltaCommitsWhenPending) {
+      metadataConfig.setValue(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING, 
String.valueOf(maxNumDeltaCommitsWhenPending));
+      return this;
+    }
+
     public HoodieMetadataConfig build() {
       metadataConfig.setDefaultValue(ENABLE, 
getDefaultMetadataEnable(engineType));
       metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());

Reply via email to