This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2d5ac55 [HUDI-2355][Bug]Archive service executed after cleaner
finished. (#3545)
2d5ac55 is described below
commit 2d5ac551955e8c7be7be60edc5aaf17d2ed7f650
Author: zhangyue19921010 <[email protected]>
AuthorDate: Thu Sep 16 07:00:04 2021 +0800
[HUDI-2355][Bug]Archive service executed after cleaner finished. (#3545)
Co-authored-by: yuezhang <[email protected]>
---
.../hudi/client/AbstractHoodieWriteClient.java | 2 +-
.../functional/TestHoodieDeltaStreamer.java | 84 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index dfb2fc8..9650dda 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -434,10 +434,10 @@ public abstract class AbstractHoodieWriteClient<T extends
HoodieRecordPayload, I
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ autoCleanOnCommit();
// We cannot have unbounded commit files. Archive commits if we have to
archive
HoodieTimelineArchiveLog archiveLog = new
HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
- autoCleanOnCommit();
if (operationType != null && operationType != WriteOperationType.CLUSTER
&& operationType != WriteOperationType.COMPACT) {
syncTableMetadata();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index aab02da..ad52ce3 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -18,6 +18,8 @@
package org.apache.hudi.utilities.functional;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -26,6 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -100,6 +103,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -972,6 +976,86 @@ public class TestHoodieDeltaStreamer extends
TestHoodieDeltaStreamerBase {
});
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean)
throws Exception {
+ String tableBasePath = dfsBasePath +
"/cleanerDeleteReplacedDataWithArchive" + asyncClean;
+
+ int totalRecords = 3000;
+
+ // Step 1 : Prepare and insert data without archival and cleaner.
+ // Make sure that there are 6 commits including 2 replacecommits completed.
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
+ cfg.continuousMode = true;
+ cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+ cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
+ cfg.configs.add(String.format("%s=%s",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamerTestRunner(ds, cfg, (r) -> {
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+ return true;
+ });
+
+ TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+
+ // Step 2 : Get the first replacecommit and extract the corresponding
replaced file IDs.
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
+ HoodieTimeline replacedTimeline =
meta.reloadActiveTimeline().getCompletedReplaceTimeline();
+ Option<HoodieInstant> firstReplaceHoodieInstant =
replacedTimeline.nthFromLastInstant(1);
+ assertTrue(firstReplaceHoodieInstant.isPresent());
+
+ Option<byte[]> firstReplaceHoodieInstantDetails =
replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get());
+ HoodieReplaceCommitMetadata firstReplaceMetadata =
HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(),
HoodieReplaceCommitMetadata.class);
+ Map<String, List<String>> partitionToReplaceFileIds =
firstReplaceMetadata.getPartitionToReplaceFileIds();
+ String partitionName = null;
+ List replacedFileIDs = null;
+ for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
+ partitionName = String.valueOf(entry.getKey());
+ replacedFileIDs = (List) entry.getValue();
+ }
+
+ assertNotNull(partitionName);
+ assertNotNull(replacedFileIDs);
+
+ // Step 3 : Based to replacedFileIDs , get the corresponding complete path.
+ ArrayList<String> replacedFilePaths = new ArrayList<>();
+ Path partitionPath = new Path(meta.getBasePath(), partitionName);
+ RemoteIterator<LocatedFileStatus> hoodieFiles =
meta.getFs().listFiles(partitionPath, true);
+ while (hoodieFiles.hasNext()) {
+ LocatedFileStatus f = hoodieFiles.next();
+ String file = f.getPath().toUri().toString();
+ for (Object replacedFileID : replacedFileIDs) {
+ if (file.contains(String.valueOf(replacedFileID))) {
+ replacedFilePaths.add(file);
+ }
+ }
+ }
+
+ assertFalse(replacedFilePaths.isEmpty());
+
+ // Step 4 : Insert 1 record and trigger sync/async cleaner and archive.
+ List<String> configs = getAsyncServicesConfigs(1, "true", "true", "2", "",
"");
+ configs.add(String.format("%s=%s",
HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
+ configs.add(String.format("%s=%s",
HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
+ configs.add(String.format("%s=%s",
HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
+ configs.add(String.format("%s=%s",
HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
+ configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN,
asyncClean));
+ cfg.configs = configs;
+ cfg.continuousMode = false;
+ ds = new HoodieDeltaStreamer(cfg, jsc);
+ ds.sync();
+
+ // Step 5 : Make sure that firstReplaceHoodieInstant is archived.
+ long count =
meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstants().filter(instant
-> firstReplaceHoodieInstant.get().equals(instant)).count();
+ assertEquals(0, count);
+
+ // Step 6 : All the replaced files in firstReplaceHoodieInstant should be
deleted through sync/async cleaner.
+ for (String replacedFilePath : replacedFilePaths) {
+ assertFalse(meta.getFs().exists(new Path(replacedFilePath)));
+ }
+ }
+
private List<String> getAsyncServicesConfigs(int totalRecords, String
autoClean, String inlineCluster,
String inlineClusterMaxCommit,
String asyncCluster, String asyncClusterMaxCommit) {
List<String> configs = new ArrayList<>();