This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 19c20e4dd93 [HUDI-7571] Add api to get exception details in HoodieMetadataTableValidator with ignoreFailed mode (#10960) 19c20e4dd93 is described below commit 19c20e4dd9309c1e3800cdbd3d487d2689a81a3b Author: Lokesh Jain <lj...@apache.org> AuthorDate: Fri Apr 5 21:59:55 2024 +0530 [HUDI-7571] Add api to get exception details in HoodieMetadataTableValidator with ignoreFailed mode (#10960) * [HUDI-7571] Add api to get exception details in HoodieMetadataTableValidator with ignoreFailed mode * Address comments --- .../utilities/HoodieMetadataTableValidator.java | 40 ++++++++++++++++++---- .../TestHoodieMetadataTableValidator.java | 3 ++ 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 1bf7e28dfa2..06dbde8b108 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -181,6 +181,8 @@ public class HoodieMetadataTableValidator implements Serializable { private final String taskLabels; + private List<Throwable> throwables = new ArrayList<>(); + public HoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) { this.jsc = jsc; this.cfg = cfg; @@ -198,6 +200,27 @@ public class HoodieMetadataTableValidator implements Serializable { this.taskLabels = generateValidationTaskLabels(); } + /** + * Returns list of Throwable which were encountered during validation. This method is useful + * when ignoreFailed parameter is set to true. + */ + public List<Throwable> getThrowables() { + return throwables; + } + + /** + * Returns true if there is a validation failure encountered during validation. + * This method is useful when ignoreFailed parameter is set to true. + */ + public boolean hasValidationFailure() { + for (Throwable throwable : throwables) { + if (throwable instanceof HoodieValidationException) { + return true; + } + } + return false; + } + private String generateValidationTaskLabels() { List<String> labelList = new ArrayList<>(); labelList.add(cfg.basePath); @@ -438,6 +461,7 @@ public class HoodieMetadataTableValidator implements Serializable { if (!cfg.ignoreFailed) { throw e; } + throwables.add(e); return false; } } @@ -502,12 +526,12 @@ public class HoodieMetadataTableValidator implements Serializable { HoodieMetadataValidationContext fsBasedContext = new HoodieMetadataValidationContext(engineContext, props, metaClient, false)) { Set<String> finalBaseFilesForCleaning = baseFilesForCleaning; - List<Pair<Boolean, String>> result = new ArrayList<>( + List<Pair<Boolean, ? extends Exception>> result = new ArrayList<>( engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> { try { validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning); LOG.info(String.format("Metadata table validation succeeded for partition %s (partition %s)", partitionPath, taskLabels)); - return Pair.of(true, ""); + return Pair.<Boolean, Exception>of(true, null); } catch (HoodieValidationException e) { LOG.error( String.format("Metadata table validation failed for partition %s due to HoodieValidationException (partition %s)", @@ -515,26 +539,29 @@ public class HoodieMetadataTableValidator implements Serializable { if (!cfg.ignoreFailed) { throw e; } - return Pair.of(false, e.getMessage() + " for partition: " + partitionPath); + return Pair.of(false, new HoodieValidationException(e.getMessage() + " for partition: " + partitionPath, e)); } }).collectAsList()); try { validateRecordIndex(engineContext, metaClient, metadataTableBasedContext.getTableMetadata()); - result.add(Pair.of(true, "")); + result.add(Pair.of(true, null)); } catch (HoodieValidationException e) { LOG.error( "Metadata table validation failed due to HoodieValidationException in record index validation for table: {} ", cfg.basePath, e); if (!cfg.ignoreFailed) { throw e; } - result.add(Pair.of(false, e.getMessage())); + result.add(Pair.of(false, e)); } - for (Pair<Boolean, String> res : result) { + for (Pair<Boolean, ? extends Exception> res : result) { finalResult &= res.getKey(); if (res.getKey().equals(false)) { LOG.error("Metadata Validation failed for table: " + cfg.basePath + " with error: " + res.getValue()); + if (res.getRight() != null) { + throwables.add(res.getRight()); + } } } @@ -1253,6 +1280,7 @@ public class HoodieMetadataTableValidator implements Serializable { if (!cfg.ignoreFailed) { throw e; } + throwables.add(e); } catch (InterruptedException e) { // ignore InterruptedException here. } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java index 74642bbcb7a..e87f6257c54 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase { @@ -68,6 +69,8 @@ public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase config.validateAllFileGroups = true; HoodieMetadataTableValidator validator = new HoodieMetadataTableValidator(jsc, config); assertTrue(validator.run()); + assertFalse(validator.hasValidationFailure()); + assertTrue(validator.getThrowables().isEmpty()); } protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {