This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch release-0.10.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 318818be44d065f1be523f7aa9eb949e09859cbb Author: rmahindra123 <[email protected]> AuthorDate: Thu Dec 2 10:32:26 2021 -0800 [HUDI-2904] Fix metadata table archival overstepping between regular writers and table services (#4186) - Co-authored-by: Rajesh Mahindra <[email protected]> - Co-authored-by: Sivabalan Narayanan <[email protected]> (cherry picked from commit 91d2e61433e74abb44cb4d0ae236ee8f4a94e1f8) --- .../hudi/client/AbstractHoodieWriteClient.java | 33 ++++++++++++++++--- .../apache/hudi/config/HoodieCompactionConfig.java | 12 +++++++ .../org/apache/hudi/config/HoodieWriteConfig.java | 4 +++ .../metadata/HoodieBackedTableMetadataWriter.java | 4 ++- .../apache/hudi/client/HoodieFlinkWriteClient.java | 11 +++---- .../FlinkHoodieBackedTableMetadataWriter.java | 1 + .../SparkHoodieBackedTableMetadataWriter.java | 1 + .../functional/TestHoodieBackedMetadata.java | 38 ++++++++++++++++++++++ 8 files changed, 91 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 96d89fc..59acbb2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -449,11 +449,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); autoCleanOnCommit(); - // We cannot have unbounded commit files. Archive commits if we have to archive - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); - archiveLog.archiveIfRequired(context); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); + if (config.isAutoArchive()) { + archive(table); + } } finally { this.heartbeatClient.stop(instantTime); } @@ -744,6 +742,31 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I } /** + * Trigger archival for the table. This ensures that the number of commits do not explode + * and keep increasing unbounded over time. + * @param table table to commit on. + */ + protected void archive(HoodieTable<T, I, K, O> table) { + try { + // We cannot have unbounded commit files. Archive commits if we have to archive + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); + archiveLog.archiveIfRequired(context); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to archive", ioe); + } + } + + /** + * Trigger archival for the table. This ensures that the number of commits do not explode + * and keep increasing unbounded over time. + */ + public void archive() { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = createTable(config, hadoopConf); + archive(table); + } + + /** * Provides a new commit time for a write operation (insert/update/delete). */ public String startCommit() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index fbe31b0..640f0cb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -57,6 +57,13 @@ public class HoodieCompactionConfig extends HoodieConfig { + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage" + " growth is bounded."); + public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty + .key("hoodie.archive.automatic") + .defaultValue("true") + .withDocumentation("When enabled, the archival table service is invoked immediately after each commit," + + " to archive commits if we cross a maximum value of commits." + + " It's recommended to enable this, to ensure number of active commits is bounded."); + public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty .key("hoodie.clean.async") .defaultValue("false") @@ -493,6 +500,11 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder withAutoArchive(Boolean autoArchive) { + compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive)); + return this; + } + public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) { compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b49108f..df4b3f6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1101,6 +1101,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieCompactionConfig.AUTO_CLEAN); } + public boolean isAutoArchive() { + return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE); + } + public boolean isAsyncClean() { return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index f9486b1..54284fc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -204,7 +204,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) // we will trigger compaction manually, to control the instant times .withInlineCompaction(false) - .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) + .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()) + // we will trigger archive manually, to ensure only regular writer invokes it + .withAutoArchive(false).build()) .withParallelism(parallelism, parallelism) .withDeleteParallelism(parallelism) .withRollbackParallelism(parallelism) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 36caa1b..374dd12 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -40,7 +40,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.HoodieIndex; @@ -57,7 +56,6 @@ import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; @@ -332,11 +330,10 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - // We cannot have unbounded commit files. Archive commits if we have to archive - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); - archiveLog.archiveIfRequired(context); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); + if (config.isAutoArchive()) { + // We cannot have unbounded commit files. Archive commits if we have to archive + archive(table); + } } finally { this.heartbeatClient.stop(instantTime); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 5e782c5..0dcfcfc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -140,6 +140,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad if (canTriggerTableService) { compactIfNecessary(writeClient, instantTime); doClean(writeClient, instantTime); + writeClient.archive(); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index ff8f556..65ade82 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -155,6 +155,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad if (canTriggerTableService) { compactIfNecessary(writeClient, instantTime); doClean(writeClient, instantTime); + writeClient.archive(); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 82bc892..73b7811 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -107,6 +107,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -250,6 +251,43 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(testTable, emptyList(), true); } + @Test + public void testMetadataTableArchival() throws Exception { + init(COPY_ON_WRITE, false); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .enableFullScan(true) + .enableMetrics(false) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .archiveCommitsWith(3, 4) + .retainCommits(1) + .build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build()).build(); + initWriteConfigAndMetatableWriter(writeConfig, true); + + AtomicInteger commitTime = new AtomicInteger(1); + // trigger 2 regular writes(1 bootstrap commit). just 1 before archival can get triggered. + int i = 1; + for (; i <= 2; i++) { + doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); + } + // expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction. + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4); + + // trigger a async table service, archival should not kick in, even though conditions are met. + doCluster(testTable, "000000" + commitTime.getAndIncrement()); + metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5); + + // trigger a regular write operation. archival should kick in. + doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); + metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3); + } + @ParameterizedTest @EnumSource(HoodieTableType.class) public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws Exception {
