This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 35aba7cb430 [HUDI-5520] Limit MDT deltacommits when data table has
pending action (#8772)
35aba7cb430 is described below
commit 35aba7cb430a09ba2d7013b25405c88dfe88f588
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun May 21 20:35:23 2023 +0800
[HUDI-5520] Limit MDT deltacommits when data table has pending action
(#8772)
---------
Co-authored-by: Jonathan Vexler <=>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 21 ++++++++++++++++++++-
.../client/functional/TestHoodieBackedMetadata.java | 21 +++++++++++++++++++++
.../hudi/common/config/HoodieMetadataConfig.java | 17 +++++++++++++++++
3 files changed, 58 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index fb19ce1e02d..cfbb750939d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -51,6 +51,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -73,6 +74,7 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.SerializablePath;
import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
+
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -97,12 +99,13 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ASYNC_CLEAN;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
@@ -822,6 +825,21 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
});
}
+ private static void checkNumDeltaCommits(HoodieTableMetaClient metaClient,
int maxNumDeltaCommitsWhenPending) {
+ final HoodieActiveTimeline activeTimeline =
metaClient.reloadActiveTimeline();
+ Option<HoodieInstant> lastCompaction =
activeTimeline.filterCompletedInstants()
+ .filter(s -> s.getAction().equals(COMPACTION_ACTION)).lastInstant();
+ int numDeltaCommits = lastCompaction.isPresent()
+ ?
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants()
+ : activeTimeline.getDeltaCommitTimeline().countInstants();
+ if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
+ throw new HoodieMetadataException(String.format("Metadata table's
deltacommits exceeded %d: "
+ + "this is likely caused by a pending instant in the data table.
Resolve the pending instant "
+ + "or adjust `%s`, then restart the pipeline.",
+ maxNumDeltaCommitsWhenPending,
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
+ }
+ }
+
private MetadataRecordsGenerationParams getRecordsGenerationParams() {
return new MetadataRecordsGenerationParams(
dataMetaClient,
@@ -1081,6 +1099,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
if (!pendingInstants.isEmpty()) {
+ checkNumDeltaCommits(metadataMetaClient,
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
LOG.info(String.format(
"Cannot compact metadata table as there are %d inflight instants in
data table before latest deltacommit in metadata table: %s. Inflight instants
in data table: %s",
pendingInstants.size(), latestDeltaCommitTimeInMetadataTable,
Arrays.toString(pendingInstants.toArray())));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 1208f85e602..10b134887c4 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2437,6 +2437,27 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
}
+ @Test
+ public void testMetadataTableWithLongLog() throws Exception {
+ init(COPY_ON_WRITE, false);
+ final int maxNumDeltacommits = 3;
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .enableMetrics(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltacommits + 100)
+ .withMaxNumDeltacommitsWhenPending(maxNumDeltacommits)
+ .build()).build();
+ initWriteConfigAndMetatableWriter(writeConfig, true);
+ testTable.addRequestedCommit(String.format("%016d", 0));
+ for (int i = 1; i <= maxNumDeltacommits; i++) {
+ doWriteOperation(testTable, String.format("%016d", i));
+ }
+ int instant = maxNumDeltacommits + 1;
+ Throwable t = assertThrows(HoodieMetadataException.class, () ->
doWriteOperation(testTable, String.format("%016d", instant)));
+ assertTrue(t.getMessage().startsWith(String.format("Metadata table's
deltacommits exceeded %d: ", maxNumDeltacommits)));
+ }
+
@Test
public void testNonPartitioned() throws Exception {
init(HoodieTableType.COPY_ON_WRITE, false);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 42d4aed2f24..6f9615578fa 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -226,6 +226,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.withDocumentation("Optimized log blocks scanner that addresses all the
multi-writer use-cases while appending to log files. "
+ "It also differentiates original blocks written by ingestion
writers and compacted blocks written by log compaction.");
+ public static final ConfigProperty<Integer>
METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING = ConfigProperty
+ .key(METADATA_PREFIX + ".max.deltacommits.when_pending")
+ .defaultValue(1000)
+ .markAdvanced()
+ .sinceVersion("0.14.0")
+ .withDocumentation("When there is a pending instant in data table, this
config limits the allowed number of deltacommits in metadata table to "
+ + "prevent the metadata table's timeline from growing unboundedly as
compaction won't be triggered due to the pending data table instant.");
+
private HoodieMetadataConfig() {
super();
}
@@ -306,6 +314,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
}
+ public int getMaxNumDeltacommitsWhenPending() {
+ return getIntOrDefault(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING);
+ }
+
/**
* Builder for {@link HoodieMetadataConfig}.
*/
@@ -431,6 +443,11 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return this;
}
+ public Builder withMaxNumDeltacommitsWhenPending(int
maxNumDeltaCommitsWhenPending) {
+ metadataConfig.setValue(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING,
String.valueOf(maxNumDeltaCommitsWhenPending));
+ return this;
+ }
+
public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE,
getDefaultMetadataEnable(engineType));
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());