This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 86698b516001fc30427f3f15a56ec5289d17a9c1 Author: lamber-ken <[email protected]> AuthorDate: Mon Mar 30 13:19:17 2020 -0500 [HUDI-716] Exception: Not an Avro data file when running HoodieCleanClient.runClean (#1432) --- .../apache/hudi/cli/commands/RepairsCommand.java | 21 +++++++++++++++++++ .../org/apache/hudi/client/HoodieCleanClient.java | 6 +++++- .../java/org/apache/hudi/table/TestCleaner.java | 19 +++++++++++++++++ .../java/org/apache/hudi/common/util/FSUtils.java | 9 ++++++++ .../apache/hudi/common/model/HoodieTestUtils.java | 24 ++++++++++++++++++++++ 5 files changed, 78 insertions(+), 1 deletion(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 83af13c..7a65336 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -25,9 +25,12 @@ import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.FSUtils; import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; @@ -51,6 +54,8 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME @Component public class RepairsCommand implements CommandMarker { + private static final Logger LOG = Logger.getLogger(RepairsCommand.class); + @CliCommand(value = "repair deduplicate", help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with") public String deduplicate( @@ -137,4 +142,20 @@ public class RepairsCommand implements CommandMarker { } return HoodiePrintHelper.print(new String[] {"Property", "Old Value", "New Value"}, rows); } + + @CliCommand(value = "repair corrupted clean files", help = "repair corrupted clean files") + public void removeCorruptedPendingCleanAction() { + + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); + + activeTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> { + try { + CleanerUtils.getCleanerPlan(client, instant); + } catch (IOException e) { + LOG.warn("try to remove corrupted instant file: " + instant); + FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant); + } + }); + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java index 15ba2a7..600e801 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java @@ -85,7 +85,11 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo // If there are inflight(failed) or previously requested clean operation, first perform them table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> { LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant); - runClean(table, hoodieInstant); + try { + runClean(table, hoodieInstant); + } catch (Exception e) { + LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e); + } }); Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index dee545a..2fc478d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -990,6 +990,25 @@ public class TestCleaner extends TestHoodieClientBase { } /** + * Test clean previous corrupted cleanFiles. + */ + @Test + public void testCleanPreviousCorruptedCleanFiles() { + HoodieWriteConfig config = + HoodieWriteConfig.newBuilder() + .withPath(basePath).withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) + .build(); + + HoodieTestUtils.createCorruptedPendingCleanFiles(metaClient, getNextInstant()); + metaClient = HoodieTableMetaClient.reload(metaClient); + + List<HoodieCleanStat> cleanStats = runCleaner(config); + assertEquals("Must not clean any files", 0, cleanStats.size()); + } + + /** * Common test method for validating pending compactions. * * @param config Hoodie Write Config diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java index add03d5..5ce7178 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java @@ -485,6 +485,15 @@ public class FSUtils { }); } + public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) { + try { + LOG.warn("try to delete instant file: " + instant); + fs.delete(new Path(metaPath, instant.getFileName()), false); + } catch (IOException e) { + throw new HoodieIOException("Could not delete instant file" + instant.getFileName(), e); + } + } + public static void deleteOlderRestoreMetaFiles(FileSystem fs, String metaPath, Stream<HoodieInstant> instants) { // TODO - this should be archived when archival is made general for all meta-data // skip MIN_ROLLBACK_TO_KEEP and delete rest diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java index cae3d89..2129168 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java @@ -179,6 +179,30 @@ public class HoodieTestUtils { } } + public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) { + Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime), + HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> { + FSDataOutputStream os = null; + try { + Path commitFile = new Path( + metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); + os = metaClient.getFs().create(commitFile, true); + // Write empty clean metadata + os.write(new byte[0]); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } finally { + if (null != os) { + try { + os.close(); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + } + }); + } + public static String createNewDataFile(String basePath, String partitionPath, String commitTime) throws IOException { String fileID = UUID.randomUUID().toString();
