This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d5ac55  [HUDI-2355][Bug]Archive service executed after cleaner 
finished. (#3545)
2d5ac55 is described below

commit 2d5ac551955e8c7be7be60edc5aaf17d2ed7f650
Author: zhangyue19921010 <[email protected]>
AuthorDate: Thu Sep 16 07:00:04 2021 +0800

    [HUDI-2355][Bug]Archive service executed after cleaner finished. (#3545)
    
    Co-authored-by: yuezhang <[email protected]>
---
 .../hudi/client/AbstractHoodieWriteClient.java     |  2 +-
 .../functional/TestHoodieDeltaStreamer.java        | 84 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index dfb2fc8..9650dda 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -434,10 +434,10 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+      autoCleanOnCommit();
       // We cannot have unbounded commit files. Archive commits if we have to 
archive
       HoodieTimelineArchiveLog archiveLog = new 
HoodieTimelineArchiveLog(config, table);
       archiveLog.archiveIfRequired(context);
-      autoCleanOnCommit();
       if (operationType != null && operationType != WriteOperationType.CLUSTER 
&& operationType != WriteOperationType.COMPACT) {
         syncTableMetadata();
       }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index aab02da..ad52ce3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.utilities.functional;
 
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -26,6 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -100,6 +103,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -972,6 +976,86 @@ public class TestHoodieDeltaStreamer extends 
TestHoodieDeltaStreamerBase {
     });
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) 
throws Exception {
+    String tableBasePath = dfsBasePath + 
"/cleanerDeleteReplacedDataWithArchive" + asyncClean;
+
+    int totalRecords = 3000;
+
+    // Step 1 : Prepare and insert data without archival and cleaner.
+    // Make sure that there are 6 commits including 2 replacecommits completed.
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
+    cfg.continuousMode = true;
+    cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
+    cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+    deltaStreamerTestRunner(ds, cfg, (r) -> {
+      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+      return true;
+    });
+
+    TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+
+    // Step 2 : Get the first replacecommit and extract the corresponding 
replaced file IDs.
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
+    HoodieTimeline replacedTimeline = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline();
+    Option<HoodieInstant> firstReplaceHoodieInstant = 
replacedTimeline.nthFromLastInstant(1);
+    assertTrue(firstReplaceHoodieInstant.isPresent());
+
+    Option<byte[]> firstReplaceHoodieInstantDetails = 
replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get());
+    HoodieReplaceCommitMetadata firstReplaceMetadata = 
HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(), 
HoodieReplaceCommitMetadata.class);
+    Map<String, List<String>> partitionToReplaceFileIds = 
firstReplaceMetadata.getPartitionToReplaceFileIds();
+    String partitionName = null;
+    List replacedFileIDs = null;
+    for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
+      partitionName = String.valueOf(entry.getKey());
+      replacedFileIDs = (List) entry.getValue();
+    }
+
+    assertNotNull(partitionName);
+    assertNotNull(replacedFileIDs);
+
+    // Step 3 : Based to replacedFileIDs , get the corresponding complete path.
+    ArrayList<String> replacedFilePaths = new ArrayList<>();
+    Path partitionPath = new Path(meta.getBasePath(), partitionName);
+    RemoteIterator<LocatedFileStatus> hoodieFiles = 
meta.getFs().listFiles(partitionPath, true);
+    while (hoodieFiles.hasNext()) {
+      LocatedFileStatus f = hoodieFiles.next();
+      String file = f.getPath().toUri().toString();
+      for (Object replacedFileID : replacedFileIDs) {
+        if (file.contains(String.valueOf(replacedFileID))) {
+          replacedFilePaths.add(file);
+        }
+      }
+    }
+
+    assertFalse(replacedFilePaths.isEmpty());
+
+    // Step 4 : Insert 1 record and trigger sync/async cleaner and archive.
+    List<String> configs = getAsyncServicesConfigs(1, "true", "true", "2", "", 
"");
+    configs.add(String.format("%s=%s", 
HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
+    configs.add(String.format("%s=%s", 
HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
+    configs.add(String.format("%s=%s", 
HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
+    configs.add(String.format("%s=%s", 
HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
+    configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, 
asyncClean));
+    cfg.configs = configs;
+    cfg.continuousMode = false;
+    ds = new HoodieDeltaStreamer(cfg, jsc);
+    ds.sync();
+
+    // Step 5 : Make sure that firstReplaceHoodieInstant is archived.
+    long count = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstants().filter(instant
 -> firstReplaceHoodieInstant.get().equals(instant)).count();
+    assertEquals(0, count);
+
+    // Step 6 : All the replaced files in firstReplaceHoodieInstant should be 
deleted through sync/async cleaner.
+    for (String replacedFilePath : replacedFilePaths) {
+      assertFalse(meta.getFs().exists(new Path(replacedFilePath)));
+    }
+  }
+
   private List<String> getAsyncServicesConfigs(int totalRecords, String 
autoClean, String inlineCluster,
                                                String inlineClusterMaxCommit, 
String asyncCluster, String asyncClusterMaxCommit) {
     List<String> configs = new ArrayList<>();

Reply via email to