This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 318818be44d065f1be523f7aa9eb949e09859cbb
Author: rmahindra123 <[email protected]>
AuthorDate: Thu Dec 2 10:32:26 2021 -0800

    [HUDI-2904] Fix metadata table archival overstepping between regular 
writers and table services (#4186)
    
    - Co-authored-by: Rajesh Mahindra <[email protected]>
    - Co-authored-by: Sivabalan Narayanan <[email protected]>
    
    (cherry picked from commit 91d2e61433e74abb44cb4d0ae236ee8f4a94e1f8)
---
 .../hudi/client/AbstractHoodieWriteClient.java     | 33 ++++++++++++++++---
 .../apache/hudi/config/HoodieCompactionConfig.java | 12 +++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 +++
 .../metadata/HoodieBackedTableMetadataWriter.java  |  4 ++-
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 11 +++----
 .../FlinkHoodieBackedTableMetadataWriter.java      |  1 +
 .../SparkHoodieBackedTableMetadataWriter.java      |  1 +
 .../functional/TestHoodieBackedMetadata.java       | 38 ++++++++++++++++++++++
 8 files changed, 91 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 96d89fc..59acbb2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -449,11 +449,9 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
       autoCleanOnCommit();
-      // We cannot have unbounded commit files. Archive commits if we have to 
archive
-      HoodieTimelineArchiveLog archiveLog = new 
HoodieTimelineArchiveLog(config, table);
-      archiveLog.archiveIfRequired(context);
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
+      if (config.isAutoArchive()) {
+        archive(table);
+      }
     } finally {
       this.heartbeatClient.stop(instantTime);
     }
@@ -744,6 +742,31 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
   }
 
   /**
+   * Trigger archival for the table. This ensures that the number of commits 
do not explode
+   * and keep increasing unbounded over time.
+   * @param table table to commit on.
+   */
+  protected void archive(HoodieTable<T, I, K, O> table) {
+    try {
+      // We cannot have unbounded commit files. Archive commits if we have to 
archive
+      HoodieTimelineArchiveLog archiveLog = new 
HoodieTimelineArchiveLog(config, table);
+      archiveLog.archiveIfRequired(context);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to archive", ioe);
+    }
+  }
+
+  /**
+   * Trigger archival for the table. This ensures that the number of commits 
do not explode
+   * and keep increasing unbounded over time.
+   */
+  public void archive() {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieTable table = createTable(config, hadoopConf);
+    archive(table);
+  }
+
+  /**
    * Provides a new commit time for a write operation (insert/update/delete).
    */
   public String startCommit() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index fbe31b0..640f0cb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -57,6 +57,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + " to delete older file slices. It's recommended to enable this, to 
ensure metadata and data storage"
           + " growth is bounded.");
 
+  public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
+      .key("hoodie.archive.automatic")
+      .defaultValue("true")
+      .withDocumentation("When enabled, the archival table service is invoked 
immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active 
commits is bounded.");
+
   public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
       .key("hoodie.clean.async")
       .defaultValue("false")
@@ -493,6 +500,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAutoArchive(Boolean autoArchive) {
+      compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
+      return this;
+    }
+
     public Builder withIncrementalCleaningMode(Boolean 
incrementalCleaningMode) {
       compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, 
String.valueOf(incrementalCleaningMode));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b49108f..df4b3f6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1101,6 +1101,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
   }
 
+  public boolean isAutoArchive() {
+    return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
+  }
+
   public boolean isAsyncClean() {
     return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f9486b1..54284fc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -204,7 +204,9 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
             .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
             // we will trigger compaction manually, to control the instant 
times
             .withInlineCompaction(false)
-            
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
+            
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
+            // we will trigger archive manually, to ensure only regular writer 
invokes it
+            .withAutoArchive(false).build())
         .withParallelism(parallelism, parallelism)
         .withDeleteParallelism(parallelism)
         .withRollbackParallelism(parallelism)
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 36caa1b..374dd12 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -40,7 +40,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
 import org.apache.hudi.index.HoodieIndex;
@@ -57,7 +56,6 @@ import 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -332,11 +330,10 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), createTable(config, 
hadoopConf), instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-      // We cannot have unbounded commit files. Archive commits if we have to 
archive
-      HoodieTimelineArchiveLog archiveLog = new 
HoodieTimelineArchiveLog(config, table);
-      archiveLog.archiveIfRequired(context);
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
+      if (config.isAutoArchive()) {
+        // We cannot have unbounded commit files. Archive commits if we have 
to archive
+        archive(table);
+      }
     } finally {
       this.heartbeatClient.stop(instantTime);
     }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 5e782c5..0dcfcfc 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -140,6 +140,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       if (canTriggerTableService) {
         compactIfNecessary(writeClient, instantTime);
         doClean(writeClient, instantTime);
+        writeClient.archive();
       }
     }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index ff8f556..65ade82 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -155,6 +155,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       if (canTriggerTableService) {
         compactIfNecessary(writeClient, instantTime);
         doClean(writeClient, instantTime);
+        writeClient.archive();
       }
     }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 82bc892..73b7811 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -107,6 +107,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
@@ -250,6 +251,43 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     validateMetadata(testTable, emptyList(), true);
   }
 
+  @Test
+  public void testMetadataTableArchival() throws Exception {
+    init(COPY_ON_WRITE, false);
+    writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .enableFullScan(true)
+            .enableMetrics(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(3)
+            .archiveCommitsWith(3, 4)
+            .retainCommits(1)
+            .build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 
3).retainCommits(1).build()).build();
+    initWriteConfigAndMetatableWriter(writeConfig, true);
+
+    AtomicInteger commitTime = new AtomicInteger(1);
+    // trigger 2 regular writes(1 bootstrap commit). just 1 before archival 
can get triggered.
+    int i = 1;
+    for (; i <= 2; i++) {
+      doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), 
INSERT);
+    }
+    // expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction.
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+    HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
+    
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
 4);
+
+    // trigger a async table service, archival should not kick in, even though 
conditions are met.
+    doCluster(testTable, "000000" + commitTime.getAndIncrement());
+    metadataTimeline = metadataMetaClient.reloadActiveTimeline();
+    
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
 5);
+
+    // trigger a regular write operation. archival should kick in.
+    doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), 
INSERT);
+    metadataTimeline = metadataMetaClient.reloadActiveTimeline();
+    
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
 3);
+  }
+
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
   public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws 
Exception {

Reply via email to