This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 096fe11482d [HUDI-6432] Fix TestCleanPlanExecutor (#9045)
096fe11482d is described below
commit 096fe11482d9fd11e98086145c38f478de96a185
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Jun 24 23:35:49 2023 -0700
[HUDI-6432] Fix TestCleanPlanExecutor (#9045)
---
.../java/org/apache/hudi/table/TestCleaner.java | 195 +----------------
.../table/functional/TestCleanPlanExecutor.java | 4 +-
.../hudi/testutils/HoodieCleanerTestBase.java | 241 +++++++++++++++++++++
3 files changed, 245 insertions(+), 195 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 9016c83cdaa..efe8e5a688c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -29,19 +29,15 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.bootstrap.TestBootstrapIndex;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -67,14 +63,12 @@ import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
-import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.clean.CleanPlanner;
-import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieCleanerTestBase;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -83,9 +77,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import java.io.File;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@@ -114,7 +106,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test Cleaning related logic.
*/
-public class TestCleaner extends HoodieClientTestBase {
+public class TestCleaner extends HoodieCleanerTestBase {
private static final int BIG_BATCH_INSERT_SIZE = 500;
private static final int PARALLELISM = 10;
@@ -404,110 +396,6 @@ public class TestCleaner extends HoodieClientTestBase {
assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
}
- /**
- * Helper to run cleaner and collect Clean Stats.
- *
- * @param config HoodieWriteConfig
- */
- protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws
IOException {
- return runCleaner(config, false, false, 1, false);
- }
-
- protected List<HoodieCleanStat>
runCleanerWithInstantFormat(HoodieWriteConfig config, boolean
needInstantInHudiFormat) throws IOException {
- return runCleaner(config, false, false, 1, needInstantInHudiFormat);
- }
-
- protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int
firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
- return runCleaner(config, false, false, firstCommitSequence,
needInstantInHudiFormat);
- }
-
- protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean
simulateRetryFailure) throws IOException {
- return runCleaner(config, simulateRetryFailure, false, 1, false);
- }
-
- protected List<HoodieCleanStat> runCleaner(
- HoodieWriteConfig config, boolean simulateRetryFailure, boolean
simulateMetadataFailure) throws IOException {
- return runCleaner(config, simulateRetryFailure, simulateMetadataFailure,
1, false);
- }
-
- /**
- * Helper to run cleaner and collect Clean Stats.
- *
- * @param config HoodieWriteConfig
- */
- protected List<HoodieCleanStat> runCleaner(
- HoodieWriteConfig config, boolean simulateRetryFailure, boolean
simulateMetadataFailure,
- Integer firstCommitSequence, boolean needInstantInHudiFormat) throws
IOException {
- SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
- String cleanInstantTs = needInstantInHudiFormat ?
makeNewCommitTime(firstCommitSequence, "%014d") :
makeNewCommitTime(firstCommitSequence, "%09d");
- HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
-
- if (null == cleanMetadata1) {
- return new ArrayList<>();
- }
-
- if (simulateRetryFailure) {
- HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED,
HoodieTimeline.CLEAN_ACTION, cleanInstantTs);
- HoodieCleanMetadata metadata =
CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
- metadata.getPartitionMetadata().values().forEach(p -> {
- String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
- p.getSuccessDeleteFiles().forEach(p2 -> {
- try {
- metaClient.getFs().create(new Path(dirPath, p2), true).close();
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- });
- });
-
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
-
- if (config.isMetadataTableEnabled() && simulateMetadataFailure) {
- // Simulate the failure of corresponding instant in the metadata table
- HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder()
-
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))
- .setConf(metaClient.getHadoopConf())
- .build();
- HoodieInstant deltaCommit = new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, cleanInstantTs);
-
metadataMetaClient.reloadActiveTimeline().revertToInflight(deltaCommit);
- }
-
- // retry clean operation again
- writeClient.clean();
- final HoodieCleanMetadata retriedCleanMetadata =
CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient),
completedCleanInstant);
- cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
- HoodieCleanPartitionMetadata p1 =
cleanMetadata1.getPartitionMetadata().get(k);
- HoodieCleanPartitionMetadata p2 =
retriedCleanMetadata.getPartitionMetadata().get(k);
- assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
- assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles());
- assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles());
- assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
- assertEquals(k, p1.getPartitionPath());
- });
- }
-
- Map<String, HoodieCleanStat> cleanStatMap =
cleanMetadata1.getPartitionMetadata().values().stream()
- .map(x -> new
HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
-
.withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles())
-
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns())
-
.withEarliestCommitRetained(Option.ofNullable(cleanMetadata1.getEarliestCommitToRetain()
!= null
- ? new HoodieInstant(State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "000")
- : null))
- .build())
- .collect(Collectors.toMap(HoodieCleanStat::getPartitionPath, x -> x));
- cleanMetadata1.getBootstrapPartitionMetadata().values().forEach(x -> {
- HoodieCleanStat s = cleanStatMap.get(x.getPartitionPath());
- cleanStatMap.put(x.getPartitionPath(), new
HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
-
.withFailedDeletes(s.getFailedDeleteFiles()).withSuccessfulDeletes(s.getSuccessDeleteFiles())
-
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(s.getDeletePathPatterns())
-
.withEarliestCommitRetained(Option.ofNullable(s.getEarliestCommitToRetain())
- .map(y -> new HoodieInstant(State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, y)))
- .withSuccessfulDeleteBootstrapBaseFiles(x.getSuccessDeleteFiles())
- .withFailedDeleteBootstrapBaseFiles(x.getFailedDeleteFiles())
-
.withDeleteBootstrapBasePathPatterns(x.getDeletePathPatterns()).build());
- });
- return new ArrayList<>(cleanStatMap.values());
- }
-
@Test
public void testCleanEmptyInstants() throws Exception {
HoodieWriteConfig config =
@@ -827,32 +715,6 @@ public class TestCleaner extends HoodieClientTestBase {
}
}
- /**
- * Generate Bootstrap index, bootstrap base file and corresponding
metaClient.
- *
- * @return Partition to BootstrapFileMapping Map
- * @throws IOException
- */
- protected Map<String, List<BootstrapFileMapping>>
generateBootstrapIndexAndSourceData(String... partitions) throws IOException {
- // create bootstrap source data path
- java.nio.file.Path sourcePath = tempDir.resolve("data");
- java.nio.file.Files.createDirectories(sourcePath);
- assertTrue(new File(sourcePath.toString()).exists());
-
- // recreate metaClient with Bootstrap base path
- metaClient = HoodieTestUtils.init(basePath, getTableType(),
sourcePath.toString(), true);
-
- // generate bootstrap index
- Map<String, List<BootstrapFileMapping>> bootstrapMapping =
TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(),
- partitions, 1);
-
- for (Map.Entry<String, List<BootstrapFileMapping>> entry :
bootstrapMapping.entrySet()) {
- new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs();
- assertTrue(new
File(entry.getValue().get(0).getBootstrapFileStatus().getPath().getUri()).createNewFile());
- }
- return bootstrapMapping;
- }
-
/**
* Test Cleaning functionality of table.rollback() API.
*/
@@ -1128,44 +990,6 @@ public class TestCleaner extends HoodieClientTestBase {
assertTrue(testTable.baseFileExists(p2, "4", file4P2), "Latest FileSlice
exists");
}
- public void commitWithMdt(String instantTime, Map<String, List<String>>
partToFileId,
- HoodieTestTable testTable,
HoodieTableMetadataWriter metadataWriter) throws Exception {
- commitWithMdt(instantTime, partToFileId, testTable, metadataWriter, true,
false);
- }
-
- public void commitWithMdt(String instantTime, Map<String, List<String>>
partToFileId,
- HoodieTestTable testTable,
HoodieTableMetadataWriter metadataWriter, boolean addBaseFiles, boolean
addLogFiles) throws Exception {
- testTable.addInflightCommit(instantTime);
- Map<String, List<String>> partToFileIds = new HashMap<>();
- partToFileId.forEach((key, value) -> {
- try {
- List<String> files = new ArrayList<>();
- if (addBaseFiles) {
- files.addAll(testTable.withBaseFilesInPartition(key,
value.toArray(new String[0])).getValue());
- }
- if (addLogFiles) {
- value.forEach(logFilePrefix -> {
- try {
- files.addAll(testTable.withLogFile(key, logFilePrefix, 1,
2).getValue());
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- }
- partToFileIds.put(key, files);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime,
partToFileIds);
- metadataWriter.performTableServices(Option.of(instantTime));
- metadataWriter.update(commitMeta, context.emptyHoodieData(), instantTime);
- metaClient.getActiveTimeline().saveAsComplete(
- new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
instantTime),
- Option.of(commitMeta.toJsonString().getBytes(StandardCharsets.UTF_8)));
- metaClient = HoodieTableMetaClient.reload(metaClient);
- }
-
/**
* Common test method for validating pending compactions.
*
@@ -1327,19 +1151,4 @@ public class TestCleaner extends HoodieClientTestBase {
return Stream.concat(stream1, stream2);
}
- protected static HoodieCommitMetadata generateCommitMetadata(
- String instantTime, Map<String, List<String>> partitionToFilePaths) {
- HoodieCommitMetadata metadata = new HoodieCommitMetadata();
- metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
HoodieTestTable.PHONY_TABLE_SCHEMA);
- partitionToFilePaths.forEach((partitionPath, fileList) ->
fileList.forEach(f -> {
- HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setPartitionPath(partitionPath);
- writeStat.setPath(partitionPath + "/" + f);
- writeStat.setFileId(f);
- writeStat.setTotalWriteBytes(1);
- writeStat.setFileSizeInBytes(1);
- metadata.addWriteStat(partitionPath, writeStat);
- }));
- return metadata;
- }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index c0289f9ae36..9d9d27e1970 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -41,7 +41,7 @@ import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
-import org.apache.hudi.table.TestCleaner;
+import org.apache.hudi.testutils.HoodieCleanerTestBase;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -72,7 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests covering different clean plan policies/strategies.
*/
-public class TestCleanPlanExecutor extends TestCleaner {
+public class TestCleanPlanExecutor extends HoodieCleanerTestBase {
@Test
public void testInvalidCleaningTriggerStrategy() {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
new file mode 100644
index 00000000000..c46607753d5
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.bootstrap.TestBootstrapIndex.generateBootstrapIndex;
+import static
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HoodieCleanerTestBase extends HoodieClientTestBase {
+ protected static HoodieCommitMetadata generateCommitMetadata(
+ String instantTime, Map<String, List<String>> partitionToFilePaths) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
HoodieTestTable.PHONY_TABLE_SCHEMA);
+ partitionToFilePaths.forEach((partitionPath, fileList) ->
fileList.forEach(f -> {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(partitionPath);
+ writeStat.setPath(partitionPath + "/" + f);
+ writeStat.setFileId(f);
+ writeStat.setTotalWriteBytes(1);
+ writeStat.setFileSizeInBytes(1);
+ metadata.addWriteStat(partitionPath, writeStat);
+ }));
+ return metadata;
+ }
+
+ /**
+ * Helper to run cleaner and collect Clean Stats.
+ *
+ * @param config HoodieWriteConfig
+ */
+ protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws
IOException {
+ return runCleaner(config, false, false, 1, false);
+ }
+
+ protected List<HoodieCleanStat>
runCleanerWithInstantFormat(HoodieWriteConfig config, boolean
needInstantInHudiFormat) throws IOException {
+ return runCleaner(config, false, false, 1, needInstantInHudiFormat);
+ }
+
+ protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int
firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
+ return runCleaner(config, false, false, firstCommitSequence,
needInstantInHudiFormat);
+ }
+
+ protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean
simulateRetryFailure) throws IOException {
+ return runCleaner(config, simulateRetryFailure, false, 1, false);
+ }
+
+ protected List<HoodieCleanStat> runCleaner(
+ HoodieWriteConfig config, boolean simulateRetryFailure, boolean
simulateMetadataFailure) throws IOException {
+ return runCleaner(config, simulateRetryFailure, simulateMetadataFailure,
1, false);
+ }
+
+ /**
+ * Helper to run cleaner and collect Clean Stats.
+ *
+ * @param config HoodieWriteConfig
+ */
+ protected List<HoodieCleanStat> runCleaner(
+ HoodieWriteConfig config, boolean simulateRetryFailure, boolean
simulateMetadataFailure,
+ Integer firstCommitSequence, boolean needInstantInHudiFormat) throws
IOException {
+ SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
+ String cleanInstantTs = needInstantInHudiFormat ?
makeNewCommitTime(firstCommitSequence, "%014d") :
makeNewCommitTime(firstCommitSequence, "%09d");
+ HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
+
+ if (null == cleanMetadata1) {
+ return new ArrayList<>();
+ }
+
+ if (simulateRetryFailure) {
+ HoodieInstant completedCleanInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION,
cleanInstantTs);
+ HoodieCleanMetadata metadata =
CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
+ metadata.getPartitionMetadata().values().forEach(p -> {
+ String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
+ p.getSuccessDeleteFiles().forEach(p2 -> {
+ try {
+ metaClient.getFs().create(new Path(dirPath, p2), true).close();
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ });
+ });
+
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
+
+ if (config.isMetadataTableEnabled() && simulateMetadataFailure) {
+ // Simulate the failure of corresponding instant in the metadata table
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder()
+
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))
+ .setConf(metaClient.getHadoopConf())
+ .build();
+ HoodieInstant deltaCommit = new
HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, cleanInstantTs);
+
metadataMetaClient.reloadActiveTimeline().revertToInflight(deltaCommit);
+ }
+
+ // retry clean operation again
+ writeClient.clean();
+ final HoodieCleanMetadata retriedCleanMetadata =
CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient),
completedCleanInstant);
+ cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
+ HoodieCleanPartitionMetadata p1 =
cleanMetadata1.getPartitionMetadata().get(k);
+ HoodieCleanPartitionMetadata p2 =
retriedCleanMetadata.getPartitionMetadata().get(k);
+ assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
+ assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles());
+ assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles());
+ assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
+ assertEquals(k, p1.getPartitionPath());
+ });
+ }
+
+ Map<String, HoodieCleanStat> cleanStatMap =
cleanMetadata1.getPartitionMetadata().values().stream()
+ .map(x -> new
HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
+
.withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles())
+
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns())
+
.withEarliestCommitRetained(Option.ofNullable(cleanMetadata1.getEarliestCommitToRetain()
!= null
+ ? new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "000")
+ : null))
+ .build())
+ .collect(Collectors.toMap(HoodieCleanStat::getPartitionPath, x -> x));
+ cleanMetadata1.getBootstrapPartitionMetadata().values().forEach(x -> {
+ HoodieCleanStat s = cleanStatMap.get(x.getPartitionPath());
+ cleanStatMap.put(x.getPartitionPath(), new
HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
+
.withFailedDeletes(s.getFailedDeleteFiles()).withSuccessfulDeletes(s.getSuccessDeleteFiles())
+
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(s.getDeletePathPatterns())
+
.withEarliestCommitRetained(Option.ofNullable(s.getEarliestCommitToRetain())
+ .map(y -> new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, y)))
+ .withSuccessfulDeleteBootstrapBaseFiles(x.getSuccessDeleteFiles())
+ .withFailedDeleteBootstrapBaseFiles(x.getFailedDeleteFiles())
+
.withDeleteBootstrapBasePathPatterns(x.getDeletePathPatterns()).build());
+ });
+ return new ArrayList<>(cleanStatMap.values());
+ }
+
+ public void commitWithMdt(String instantTime, Map<String, List<String>>
partToFileId,
+ HoodieTestTable testTable,
HoodieTableMetadataWriter metadataWriter) throws Exception {
+ commitWithMdt(instantTime, partToFileId, testTable, metadataWriter, true,
false);
+ }
+
+ public void commitWithMdt(String instantTime, Map<String, List<String>>
partToFileId,
+ HoodieTestTable testTable,
HoodieTableMetadataWriter metadataWriter, boolean addBaseFiles, boolean
addLogFiles) throws Exception {
+ testTable.addInflightCommit(instantTime);
+ Map<String, List<String>> partToFileIds = new HashMap<>();
+ partToFileId.forEach((key, value) -> {
+ try {
+ List<String> files = new ArrayList<>();
+ if (addBaseFiles) {
+ files.addAll(testTable.withBaseFilesInPartition(key,
value.toArray(new String[0])).getValue());
+ }
+ if (addLogFiles) {
+ value.forEach(logFilePrefix -> {
+ try {
+ files.addAll(testTable.withLogFile(key, logFilePrefix, 1,
2).getValue());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ }
+ partToFileIds.put(key, files);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime,
partToFileIds);
+ metadataWriter.performTableServices(Option.of(instantTime));
+ metadataWriter.update(commitMeta, context.emptyHoodieData(), instantTime);
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, instantTime),
+ Option.of(commitMeta.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ }
+
+ /**
+ * Generate Bootstrap index, bootstrap base file and corresponding
metaClient.
+ *
+ * @return Partition to BootstrapFileMapping Map
+ * @throws IOException
+ */
+ protected Map<String, List<BootstrapFileMapping>>
generateBootstrapIndexAndSourceData(String... partitions) throws IOException {
+ // create bootstrap source data path
+ java.nio.file.Path sourcePath = tempDir.resolve("data");
+ java.nio.file.Files.createDirectories(sourcePath);
+ assertTrue(new File(sourcePath.toString()).exists());
+
+ // recreate metaClient with Bootstrap base path
+ metaClient = HoodieTestUtils.init(basePath, getTableType(),
sourcePath.toString(), true);
+
+ // generate bootstrap index
+ Map<String, List<BootstrapFileMapping>> bootstrapMapping =
generateBootstrapIndex(metaClient, sourcePath.toString(), partitions, 1);
+
+ for (Map.Entry<String, List<BootstrapFileMapping>> entry :
bootstrapMapping.entrySet()) {
+ new File(sourcePath + "/" + entry.getKey()).mkdirs();
+ assertTrue(new
File(entry.getValue().get(0).getBootstrapFileStatus().getPath().getUri()).createNewFile());
+ }
+ return bootstrapMapping;
+ }
+}