This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 096fe11482d [HUDI-6432] Fix TestCleanPlanExecutor (#9045)
096fe11482d is described below

commit 096fe11482d9fd11e98086145c38f478de96a185
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Jun 24 23:35:49 2023 -0700

    [HUDI-6432] Fix TestCleanPlanExecutor (#9045)
---
 .../java/org/apache/hudi/table/TestCleaner.java    | 195 +----------------
 .../table/functional/TestCleanPlanExecutor.java    |   4 +-
 .../hudi/testutils/HoodieCleanerTestBase.java      | 241 +++++++++++++++++++++
 3 files changed, 245 insertions(+), 195 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 9016c83cdaa..efe8e5a688c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -29,19 +29,15 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.bootstrap.TestBootstrapIndex;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.BootstrapFileMapping;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -67,14 +63,12 @@ import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
-import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.action.clean.CleanPlanner;
-import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieCleanerTestBase;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -83,9 +77,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -114,7 +106,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * Test Cleaning related logic.
  */
-public class TestCleaner extends HoodieClientTestBase {
+public class TestCleaner extends HoodieCleanerTestBase {
 
   private static final int BIG_BATCH_INSERT_SIZE = 500;
   private static final int PARALLELISM = 10;
@@ -404,110 +396,6 @@ public class TestCleaner extends HoodieClientTestBase {
     assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
   }
 
-  /**
-   * Helper to run cleaner and collect Clean Stats.
-   *
-   * @param config HoodieWriteConfig
-   */
-  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws 
IOException {
-    return runCleaner(config, false, false, 1, false);
-  }
-
-  protected List<HoodieCleanStat> 
runCleanerWithInstantFormat(HoodieWriteConfig config, boolean 
needInstantInHudiFormat) throws IOException {
-    return runCleaner(config, false, false, 1, needInstantInHudiFormat);
-  }
-
-  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int 
firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
-    return runCleaner(config, false, false, firstCommitSequence, 
needInstantInHudiFormat);
-  }
-
-  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean 
simulateRetryFailure) throws IOException {
-    return runCleaner(config, simulateRetryFailure, false, 1, false);
-  }
-
-  protected List<HoodieCleanStat> runCleaner(
-      HoodieWriteConfig config, boolean simulateRetryFailure, boolean 
simulateMetadataFailure) throws IOException {
-    return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 
1, false);
-  }
-
-  /**
-   * Helper to run cleaner and collect Clean Stats.
-   *
-   * @param config HoodieWriteConfig
-   */
-  protected List<HoodieCleanStat> runCleaner(
-      HoodieWriteConfig config, boolean simulateRetryFailure, boolean 
simulateMetadataFailure,
-      Integer firstCommitSequence, boolean needInstantInHudiFormat) throws 
IOException {
-    SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
-    String cleanInstantTs = needInstantInHudiFormat ? 
makeNewCommitTime(firstCommitSequence, "%014d") : 
makeNewCommitTime(firstCommitSequence, "%09d");
-    HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
-
-    if (null == cleanMetadata1) {
-      return new ArrayList<>();
-    }
-
-    if (simulateRetryFailure) {
-      HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.CLEAN_ACTION, cleanInstantTs);
-      HoodieCleanMetadata metadata = 
CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
-      metadata.getPartitionMetadata().values().forEach(p -> {
-        String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
-        p.getSuccessDeleteFiles().forEach(p2 -> {
-          try {
-            metaClient.getFs().create(new Path(dirPath, p2), true).close();
-          } catch (IOException e) {
-            throw new HoodieIOException(e.getMessage(), e);
-          }
-        });
-      });
-      
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
-
-      if (config.isMetadataTableEnabled() && simulateMetadataFailure) {
-        // Simulate the failure of corresponding instant in the metadata table
-        HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder()
-            
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))
-            .setConf(metaClient.getHadoopConf())
-            .build();
-        HoodieInstant deltaCommit = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, cleanInstantTs);
-        
metadataMetaClient.reloadActiveTimeline().revertToInflight(deltaCommit);
-      }
-
-      // retry clean operation again
-      writeClient.clean();
-      final HoodieCleanMetadata retriedCleanMetadata = 
CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), 
completedCleanInstant);
-      cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
-        HoodieCleanPartitionMetadata p1 = 
cleanMetadata1.getPartitionMetadata().get(k);
-        HoodieCleanPartitionMetadata p2 = 
retriedCleanMetadata.getPartitionMetadata().get(k);
-        assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
-        assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles());
-        assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles());
-        assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
-        assertEquals(k, p1.getPartitionPath());
-      });
-    }
-
-    Map<String, HoodieCleanStat> cleanStatMap = 
cleanMetadata1.getPartitionMetadata().values().stream()
-        .map(x -> new 
HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
-            
.withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles())
-            
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns())
-            
.withEarliestCommitRetained(Option.ofNullable(cleanMetadata1.getEarliestCommitToRetain()
 != null
-                ? new HoodieInstant(State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "000")
-                : null))
-            .build())
-        .collect(Collectors.toMap(HoodieCleanStat::getPartitionPath, x -> x));
-    cleanMetadata1.getBootstrapPartitionMetadata().values().forEach(x -> {
-      HoodieCleanStat s = cleanStatMap.get(x.getPartitionPath());
-      cleanStatMap.put(x.getPartitionPath(), new 
HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
-          
.withFailedDeletes(s.getFailedDeleteFiles()).withSuccessfulDeletes(s.getSuccessDeleteFiles())
-          
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(s.getDeletePathPatterns())
-          
.withEarliestCommitRetained(Option.ofNullable(s.getEarliestCommitToRetain())
-              .map(y -> new HoodieInstant(State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, y)))
-          .withSuccessfulDeleteBootstrapBaseFiles(x.getSuccessDeleteFiles())
-          .withFailedDeleteBootstrapBaseFiles(x.getFailedDeleteFiles())
-          
.withDeleteBootstrapBasePathPatterns(x.getDeletePathPatterns()).build());
-    });
-    return new ArrayList<>(cleanStatMap.values());
-  }
-
   @Test
   public void testCleanEmptyInstants() throws Exception {
     HoodieWriteConfig config =
@@ -827,32 +715,6 @@ public class TestCleaner extends HoodieClientTestBase {
     }
   }
 
-  /**
-   * Generate Bootstrap index, bootstrap base file and corresponding 
metaClient.
-   *
-   * @return Partition to BootstrapFileMapping Map
-   * @throws IOException
-   */
-  protected Map<String, List<BootstrapFileMapping>> 
generateBootstrapIndexAndSourceData(String... partitions) throws IOException {
-    // create bootstrap source data path
-    java.nio.file.Path sourcePath = tempDir.resolve("data");
-    java.nio.file.Files.createDirectories(sourcePath);
-    assertTrue(new File(sourcePath.toString()).exists());
-
-    // recreate metaClient with Bootstrap base path
-    metaClient = HoodieTestUtils.init(basePath, getTableType(), 
sourcePath.toString(), true);
-
-    // generate bootstrap index
-    Map<String, List<BootstrapFileMapping>> bootstrapMapping = 
TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(),
-        partitions, 1);
-
-    for (Map.Entry<String, List<BootstrapFileMapping>> entry : 
bootstrapMapping.entrySet()) {
-      new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs();
-      assertTrue(new 
File(entry.getValue().get(0).getBootstrapFileStatus().getPath().getUri()).createNewFile());
-    }
-    return bootstrapMapping;
-  }
-
   /**
    * Test Cleaning functionality of table.rollback() API.
    */
@@ -1128,44 +990,6 @@ public class TestCleaner extends HoodieClientTestBase {
     assertTrue(testTable.baseFileExists(p2, "4", file4P2), "Latest FileSlice 
exists");
   }
 
-  public void commitWithMdt(String instantTime, Map<String, List<String>> 
partToFileId,
-                            HoodieTestTable testTable, 
HoodieTableMetadataWriter metadataWriter) throws Exception {
-    commitWithMdt(instantTime, partToFileId, testTable, metadataWriter, true, 
false);
-  }
-
-  public void commitWithMdt(String instantTime, Map<String, List<String>> 
partToFileId,
-                            HoodieTestTable testTable, 
HoodieTableMetadataWriter metadataWriter, boolean addBaseFiles, boolean 
addLogFiles) throws Exception {
-    testTable.addInflightCommit(instantTime);
-    Map<String, List<String>> partToFileIds = new HashMap<>();
-    partToFileId.forEach((key, value) -> {
-      try {
-        List<String> files = new ArrayList<>();
-        if (addBaseFiles) {
-          files.addAll(testTable.withBaseFilesInPartition(key, 
value.toArray(new String[0])).getValue());
-        }
-        if (addLogFiles) {
-          value.forEach(logFilePrefix -> {
-            try {
-              files.addAll(testTable.withLogFile(key, logFilePrefix, 1, 
2).getValue());
-            } catch (Exception e) {
-              e.printStackTrace();
-            }
-          });
-        }
-        partToFileIds.put(key, files);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    });
-    HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime, 
partToFileIds);
-    metadataWriter.performTableServices(Option.of(instantTime));
-    metadataWriter.update(commitMeta, context.emptyHoodieData(), instantTime);
-    metaClient.getActiveTimeline().saveAsComplete(
-        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
instantTime),
-        Option.of(commitMeta.toJsonString().getBytes(StandardCharsets.UTF_8)));
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-  }
-
   /**
    * Common test method for validating pending compactions.
    *
@@ -1327,19 +1151,4 @@ public class TestCleaner extends HoodieClientTestBase {
     return Stream.concat(stream1, stream2);
   }
 
-  protected static HoodieCommitMetadata generateCommitMetadata(
-      String instantTime, Map<String, List<String>> partitionToFilePaths) {
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
-    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
HoodieTestTable.PHONY_TABLE_SCHEMA);
-    partitionToFilePaths.forEach((partitionPath, fileList) -> 
fileList.forEach(f -> {
-      HoodieWriteStat writeStat = new HoodieWriteStat();
-      writeStat.setPartitionPath(partitionPath);
-      writeStat.setPath(partitionPath + "/" + f);
-      writeStat.setFileId(f);
-      writeStat.setTotalWriteBytes(1);
-      writeStat.setFileSizeInBytes(1);
-      metadata.addWriteStat(partitionPath, writeStat);
-    }));
-    return metadata;
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index c0289f9ae36..9d9d27e1970 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -41,7 +41,7 @@ import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
-import org.apache.hudi.table.TestCleaner;
+import org.apache.hudi.testutils.HoodieCleanerTestBase;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -72,7 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * Tests covering different clean plan policies/strategies.
  */
-public class TestCleanPlanExecutor extends TestCleaner {
+public class TestCleanPlanExecutor extends HoodieCleanerTestBase {
 
   @Test
   public void testInvalidCleaningTriggerStrategy() {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
new file mode 100644
index 00000000000..c46607753d5
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.bootstrap.TestBootstrapIndex.generateBootstrapIndex;
+import static 
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HoodieCleanerTestBase extends HoodieClientTestBase {
+  protected static HoodieCommitMetadata generateCommitMetadata(
+      String instantTime, Map<String, List<String>> partitionToFilePaths) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
HoodieTestTable.PHONY_TABLE_SCHEMA);
+    partitionToFilePaths.forEach((partitionPath, fileList) -> 
fileList.forEach(f -> {
+      HoodieWriteStat writeStat = new HoodieWriteStat();
+      writeStat.setPartitionPath(partitionPath);
+      writeStat.setPath(partitionPath + "/" + f);
+      writeStat.setFileId(f);
+      writeStat.setTotalWriteBytes(1);
+      writeStat.setFileSizeInBytes(1);
+      metadata.addWriteStat(partitionPath, writeStat);
+    }));
+    return metadata;
+  }
+
+  /**
+   * Helper to run cleaner and collect Clean Stats.
+   *
+   * @param config HoodieWriteConfig
+   */
+  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws 
IOException {
+    return runCleaner(config, false, false, 1, false);
+  }
+
+  protected List<HoodieCleanStat> 
runCleanerWithInstantFormat(HoodieWriteConfig config, boolean 
needInstantInHudiFormat) throws IOException {
+    return runCleaner(config, false, false, 1, needInstantInHudiFormat);
+  }
+
+  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int 
firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
+    return runCleaner(config, false, false, firstCommitSequence, 
needInstantInHudiFormat);
+  }
+
+  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean 
simulateRetryFailure) throws IOException {
+    return runCleaner(config, simulateRetryFailure, false, 1, false);
+  }
+
+  protected List<HoodieCleanStat> runCleaner(
+      HoodieWriteConfig config, boolean simulateRetryFailure, boolean 
simulateMetadataFailure) throws IOException {
+    return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 
1, false);
+  }
+
+  /**
+   * Helper to run cleaner and collect Clean Stats.
+   *
+   * @param config HoodieWriteConfig
+   */
+  protected List<HoodieCleanStat> runCleaner(
+      HoodieWriteConfig config, boolean simulateRetryFailure, boolean 
simulateMetadataFailure,
+      Integer firstCommitSequence, boolean needInstantInHudiFormat) throws 
IOException {
+    SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
+    String cleanInstantTs = needInstantInHudiFormat ? 
makeNewCommitTime(firstCommitSequence, "%014d") : 
makeNewCommitTime(firstCommitSequence, "%09d");
+    HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
+
+    if (null == cleanMetadata1) {
+      return new ArrayList<>();
+    }
+
+    if (simulateRetryFailure) {
+      HoodieInstant completedCleanInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION, 
cleanInstantTs);
+      HoodieCleanMetadata metadata = 
CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
+      metadata.getPartitionMetadata().values().forEach(p -> {
+        String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
+        p.getSuccessDeleteFiles().forEach(p2 -> {
+          try {
+            metaClient.getFs().create(new Path(dirPath, p2), true).close();
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+        });
+      });
+      
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
+
+      if (config.isMetadataTableEnabled() && simulateMetadataFailure) {
+        // Simulate the failure of corresponding instant in the metadata table
+        HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder()
+            
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))
+            .setConf(metaClient.getHadoopConf())
+            .build();
+        HoodieInstant deltaCommit = new 
HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, cleanInstantTs);
+        
metadataMetaClient.reloadActiveTimeline().revertToInflight(deltaCommit);
+      }
+
+      // retry clean operation again
+      writeClient.clean();
+      final HoodieCleanMetadata retriedCleanMetadata = 
CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), 
completedCleanInstant);
+      cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
+        HoodieCleanPartitionMetadata p1 = 
cleanMetadata1.getPartitionMetadata().get(k);
+        HoodieCleanPartitionMetadata p2 = 
retriedCleanMetadata.getPartitionMetadata().get(k);
+        assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
+        assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles());
+        assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles());
+        assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
+        assertEquals(k, p1.getPartitionPath());
+      });
+    }
+
+    Map<String, HoodieCleanStat> cleanStatMap = 
cleanMetadata1.getPartitionMetadata().values().stream()
+        .map(x -> new 
HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
+            
.withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles())
+            
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns())
+            
.withEarliestCommitRetained(Option.ofNullable(cleanMetadata1.getEarliestCommitToRetain()
 != null
+                ? new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "000")
+                : null))
+            .build())
+        .collect(Collectors.toMap(HoodieCleanStat::getPartitionPath, x -> x));
+    cleanMetadata1.getBootstrapPartitionMetadata().values().forEach(x -> {
+      HoodieCleanStat s = cleanStatMap.get(x.getPartitionPath());
+      cleanStatMap.put(x.getPartitionPath(), new 
HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
+          
.withFailedDeletes(s.getFailedDeleteFiles()).withSuccessfulDeletes(s.getSuccessDeleteFiles())
+          
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(s.getDeletePathPatterns())
+          
.withEarliestCommitRetained(Option.ofNullable(s.getEarliestCommitToRetain())
+              .map(y -> new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, y)))
+          .withSuccessfulDeleteBootstrapBaseFiles(x.getSuccessDeleteFiles())
+          .withFailedDeleteBootstrapBaseFiles(x.getFailedDeleteFiles())
+          
.withDeleteBootstrapBasePathPatterns(x.getDeletePathPatterns()).build());
+    });
+    return new ArrayList<>(cleanStatMap.values());
+  }
+
+  public void commitWithMdt(String instantTime, Map<String, List<String>> 
partToFileId,
+                            HoodieTestTable testTable, 
HoodieTableMetadataWriter metadataWriter) throws Exception {
+    commitWithMdt(instantTime, partToFileId, testTable, metadataWriter, true, 
false);
+  }
+
+  public void commitWithMdt(String instantTime, Map<String, List<String>> 
partToFileId,
+                            HoodieTestTable testTable, 
HoodieTableMetadataWriter metadataWriter, boolean addBaseFiles, boolean 
addLogFiles) throws Exception {
+    testTable.addInflightCommit(instantTime);
+    Map<String, List<String>> partToFileIds = new HashMap<>();
+    partToFileId.forEach((key, value) -> {
+      try {
+        List<String> files = new ArrayList<>();
+        if (addBaseFiles) {
+          files.addAll(testTable.withBaseFilesInPartition(key, 
value.toArray(new String[0])).getValue());
+        }
+        if (addLogFiles) {
+          value.forEach(logFilePrefix -> {
+            try {
+              files.addAll(testTable.withLogFile(key, logFilePrefix, 1, 
2).getValue());
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          });
+        }
+        partToFileIds.put(key, files);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime, 
partToFileIds);
+    metadataWriter.performTableServices(Option.of(instantTime));
+    metadataWriter.update(commitMeta, context.emptyHoodieData(), instantTime);
+    metaClient.getActiveTimeline().saveAsComplete(
+        new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, instantTime),
+        Option.of(commitMeta.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+  }
+
+  /**
+   * Generate Bootstrap index, bootstrap base file and corresponding 
metaClient.
+   *
+   * @return Partition to BootstrapFileMapping Map
+   * @throws IOException
+   */
+  protected Map<String, List<BootstrapFileMapping>> 
generateBootstrapIndexAndSourceData(String... partitions) throws IOException {
+    // create bootstrap source data path
+    java.nio.file.Path sourcePath = tempDir.resolve("data");
+    java.nio.file.Files.createDirectories(sourcePath);
+    assertTrue(new File(sourcePath.toString()).exists());
+
+    // recreate metaClient with Bootstrap base path
+    metaClient = HoodieTestUtils.init(basePath, getTableType(), 
sourcePath.toString(), true);
+
+    // generate bootstrap index
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping = 
generateBootstrapIndex(metaClient, sourcePath.toString(), partitions, 1);
+
+    for (Map.Entry<String, List<BootstrapFileMapping>> entry : 
bootstrapMapping.entrySet()) {
+      new File(sourcePath + "/" + entry.getKey()).mkdirs();
+      assertTrue(new 
File(entry.getValue().get(0).getBootstrapFileStatus().getPath().getUri()).createNewFile());
+    }
+    return bootstrapMapping;
+  }
+}

Reply via email to