This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 86698b516001fc30427f3f15a56ec5289d17a9c1
Author: lamber-ken <[email protected]>
AuthorDate: Mon Mar 30 13:19:17 2020 -0500

    [HUDI-716] Exception: Not an Avro data file when running 
HoodieCleanClient.runClean (#1432)
---
 .../apache/hudi/cli/commands/RepairsCommand.java   | 21 +++++++++++++++++++
 .../org/apache/hudi/client/HoodieCleanClient.java  |  6 +++++-
 .../java/org/apache/hudi/table/TestCleaner.java    | 19 +++++++++++++++++
 .../java/org/apache/hudi/common/util/FSUtils.java  |  9 ++++++++
 .../apache/hudi/common/model/HoodieTestUtils.java  | 24 ++++++++++++++++++++++
 5 files changed, 78 insertions(+), 1 deletion(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 83af13c..7a65336 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -25,9 +25,12 @@ import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.FSUtils;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 import org.apache.spark.launcher.SparkLauncher;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliCommand;
@@ -51,6 +54,8 @@ import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
 @Component
 public class RepairsCommand implements CommandMarker {
 
+  private static final Logger LOG = Logger.getLogger(RepairsCommand.class);
+
   @CliCommand(value = "repair deduplicate",
       help = "De-duplicate a partition path contains duplicates & produce 
repaired files to replace with")
   public String deduplicate(
@@ -137,4 +142,20 @@ public class RepairsCommand implements CommandMarker {
     }
     return HoodiePrintHelper.print(new String[] {"Property", "Old Value", "New 
Value"}, rows);
   }
+
+  @CliCommand(value = "repair corrupted clean files", help = "repair corrupted 
clean files")
+  public void removeCorruptedPendingCleanAction() {
+
+    HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+    HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
+
+    activeTimeline.filterInflightsAndRequested().getInstants().forEach(instant 
-> {
+      try {
+        CleanerUtils.getCleanerPlan(client, instant);
+      } catch (IOException e) {
+        LOG.warn("try to remove corrupted instant file: " + instant);
+        FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), 
instant);
+      }
+    });
+  }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java
index 15ba2a7..600e801 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java
@@ -85,7 +85,11 @@ public class HoodieCleanClient<T extends 
HoodieRecordPayload> extends AbstractHo
     // If there are inflight(failed) or previously requested clean operation, 
first perform them
     
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant
 -> {
       LOG.info("There were previously unfinished cleaner operations. Finishing 
Instant=" + hoodieInstant);
-      runClean(table, hoodieInstant);
+      try {
+        runClean(table, hoodieInstant);
+      } catch (Exception e) {
+        LOG.warn("Failed to perform previous clean operation, instant: " + 
hoodieInstant, e);
+      }
     });
 
     Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index dee545a..2fc478d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -990,6 +990,25 @@ public class TestCleaner extends TestHoodieClientBase {
   }
 
   /**
+   * Test clean previous corrupted cleanFiles.
+   */
+  @Test
+  public void testCleanPreviousCorruptedCleanFiles() {
+    HoodieWriteConfig config =
+        HoodieWriteConfig.newBuilder()
+            .withPath(basePath).withAssumeDatePartitioning(true)
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
+            .build();
+
+    HoodieTestUtils.createCorruptedPendingCleanFiles(metaClient, 
getNextInstant());
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    List<HoodieCleanStat> cleanStats = runCleaner(config);
+    assertEquals("Must not clean any files", 0, cleanStats.size());
+  }
+
+  /**
    * Common test method for validating pending compactions.
    *
    * @param config Hoodie Write Config
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
index add03d5..5ce7178 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
@@ -485,6 +485,15 @@ public class FSUtils {
     });
   }
 
+  public static void deleteInstantFile(FileSystem fs, String metaPath, 
HoodieInstant instant) {
+    try {
+      LOG.warn("try to delete instant file: " + instant);
+      fs.delete(new Path(metaPath, instant.getFileName()), false);
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not delete instant file" + 
instant.getFileName(), e);
+    }
+  }
+
   public static void deleteOlderRestoreMetaFiles(FileSystem fs, String 
metaPath, Stream<HoodieInstant> instants) {
     // TODO - this should be archived when archival is made general for all 
meta-data
     // skip MIN_ROLLBACK_TO_KEEP and delete rest
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
index cae3d89..2129168 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
@@ -179,6 +179,30 @@ public class HoodieTestUtils {
     }
   }
 
+  public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient 
metaClient, String commitTime) {
+    Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime),
+        HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> {
+          FSDataOutputStream os = null;
+          try {
+            Path commitFile = new Path(
+                    metaClient.getBasePath() + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
+            os = metaClient.getFs().create(commitFile, true);
+            // Write empty clean metadata
+            os.write(new byte[0]);
+          } catch (IOException ioe) {
+            throw new HoodieIOException(ioe.getMessage(), ioe);
+          } finally {
+            if (null != os) {
+              try {
+                os.close();
+              } catch (IOException e) {
+                throw new HoodieIOException(e.getMessage(), e);
+              }
+            }
+          }
+        });
+  }
+
   public static String createNewDataFile(String basePath, String 
partitionPath, String commitTime)
       throws IOException {
     String fileID = UUID.randomUUID().toString();

Reply via email to