This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 91d2e61 [HUDI-2904] Fix metadata table archival overstepping between
regular writers and table services (#4186)
91d2e61 is described below
commit 91d2e61433e74abb44cb4d0ae236ee8f4a94e1f8
Author: rmahindra123 <[email protected]>
AuthorDate: Thu Dec 2 10:32:26 2021 -0800
[HUDI-2904] Fix metadata table archival overstepping between regular
writers and table services (#4186)
- Co-authored-by: Rajesh Mahindra <[email protected]>
- Co-authored-by: Sivabalan Narayanan <[email protected]>
---
.../hudi/client/AbstractHoodieWriteClient.java | 33 ++++++++++++++++---
.../apache/hudi/config/HoodieCompactionConfig.java | 12 +++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +++
.../metadata/HoodieBackedTableMetadataWriter.java | 4 ++-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 11 +++----
.../FlinkHoodieBackedTableMetadataWriter.java | 1 +
.../SparkHoodieBackedTableMetadataWriter.java | 1 +
.../functional/TestHoodieBackedMetadata.java | 38 ++++++++++++++++++++++
8 files changed, 91 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 96d89fc..59acbb2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -449,11 +449,9 @@ public abstract class AbstractHoodieWriteClient<T extends
HoodieRecordPayload, I
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoCleanOnCommit();
- // We cannot have unbounded commit files. Archive commits if we have to
archive
- HoodieTimelineArchiveLog archiveLog = new
HoodieTimelineArchiveLog(config, table);
- archiveLog.archiveIfRequired(context);
- } catch (IOException ioe) {
- throw new HoodieIOException(ioe.getMessage(), ioe);
+ if (config.isAutoArchive()) {
+ archive(table);
+ }
} finally {
this.heartbeatClient.stop(instantTime);
}
@@ -744,6 +742,31 @@ public abstract class AbstractHoodieWriteClient<T extends
HoodieRecordPayload, I
}
/**
+ * Trigger archival for the table. This ensures that the number of commits
do not explode
+ * and keep increasing unbounded over time.
+ * @param table table to commit on.
+ */
+ protected void archive(HoodieTable<T, I, K, O> table) {
+ try {
+ // We cannot have unbounded commit files. Archive commits if we have to
archive
+ HoodieTimelineArchiveLog archiveLog = new
HoodieTimelineArchiveLog(config, table);
+ archiveLog.archiveIfRequired(context);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to archive", ioe);
+ }
+ }
+
+ /**
+ * Trigger archival for the table. This ensures that the number of commits
do not explode
+ * and keep increasing unbounded over time.
+ */
+ public void archive() {
+ // Create a Hoodie table which encapsulated the commits and files visible
+ HoodieTable table = createTable(config, hadoopConf);
+ archive(table);
+ }
+
+ /**
* Provides a new commit time for a write operation (insert/update/delete).
*/
public String startCommit() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index fbe31b0..640f0cb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -57,6 +57,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ " to delete older file slices. It's recommended to enable this, to
ensure metadata and data storage"
+ " growth is bounded.");
+ public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
+ .key("hoodie.archive.automatic")
+ .defaultValue("true")
+ .withDocumentation("When enabled, the archival table service is invoked
immediately after each commit,"
+ + " to archive commits if we cross a maximum value of commits."
+ + " It's recommended to enable this, to ensure number of active
commits is bounded.");
+
public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
.key("hoodie.clean.async")
.defaultValue("false")
@@ -493,6 +500,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
+ public Builder withAutoArchive(Boolean autoArchive) {
+ compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
+ return this;
+ }
+
public Builder withIncrementalCleaningMode(Boolean
incrementalCleaningMode) {
compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE,
String.valueOf(incrementalCleaningMode));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b49108f..df4b3f6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1101,6 +1101,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
}
+ public boolean isAutoArchive() {
+ return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
+ }
+
public boolean isAsyncClean() {
return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f9486b1..54284fc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -204,7 +204,9 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
.archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
// we will trigger compaction manually, to control the instant
times
.withInlineCompaction(false)
-
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
+
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
+ // we will trigger archive manually, to ensure only regular writer
invokes it
+ .withAutoArchive(false).build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 36caa1b..374dd12 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -40,7 +40,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
@@ -57,7 +56,6 @@ import
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -332,11 +330,10 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), createTable(config,
hadoopConf), instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- // We cannot have unbounded commit files. Archive commits if we have to
archive
- HoodieTimelineArchiveLog archiveLog = new
HoodieTimelineArchiveLog(config, table);
- archiveLog.archiveIfRequired(context);
- } catch (IOException ioe) {
- throw new HoodieIOException(ioe.getMessage(), ioe);
+ if (config.isAutoArchive()) {
+ // We cannot have unbounded commit files. Archive commits if we have
to archive
+ archive(table);
+ }
} finally {
this.heartbeatClient.stop(instantTime);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 5e782c5..0dcfcfc 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -140,6 +140,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
if (canTriggerTableService) {
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
+ writeClient.archive();
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index ff8f556..65ade82 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -155,6 +155,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
if (canTriggerTableService) {
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
+ writeClient.archive();
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 82bc892..73b7811 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -107,6 +107,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
@@ -250,6 +251,43 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(testTable, emptyList(), true);
}
+ @Test
+ public void testMetadataTableArchival() throws Exception {
+ init(COPY_ON_WRITE, false);
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .enableFullScan(true)
+ .enableMetrics(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(3)
+ .archiveCommitsWith(3, 4)
+ .retainCommits(1)
+ .build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2,
3).retainCommits(1).build()).build();
+ initWriteConfigAndMetatableWriter(writeConfig, true);
+
+ AtomicInteger commitTime = new AtomicInteger(1);
+ // trigger 2 regular writes(1 bootstrap commit). just 1 before archival
can get triggered.
+ int i = 1;
+ for (; i <= 2; i++) {
+ doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()),
INSERT);
+ }
+ // expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction.
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+ HoodieActiveTimeline metadataTimeline =
metadataMetaClient.reloadActiveTimeline();
+
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
4);
+
+ // trigger a async table service, archival should not kick in, even though
conditions are met.
+ doCluster(testTable, "000000" + commitTime.getAndIncrement());
+ metadataTimeline = metadataMetaClient.reloadActiveTimeline();
+
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
5);
+
+ // trigger a regular write operation. archival should kick in.
+ doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()),
INSERT);
+ metadataTimeline = metadataMetaClient.reloadActiveTimeline();
+
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
3);
+ }
+
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws
Exception {