This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6746f44ba1893c82c9793486600aa847adc9171a Author: Sagar Sumit <[email protected]> AuthorDate: Tue Feb 27 22:45:10 2024 +0530 [MINOR] Improve mdt validator and upsert partitioner logs (#10656) --- .../table/action/commit/UpsertPartitioner.java | 8 ++-- .../utilities/HoodieMetadataTableValidator.java | 43 +++++++++++----------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index edd6d981d18..2b78df96765 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -97,7 +97,8 @@ public class UpsertPartitioner<T> extends SparkHoodiePartitioner<T> { assignUpdates(profile); assignInserts(profile, context); - LOG.info("Total Buckets: " + totalBuckets); + LOG.info("Total Buckets: {}, bucketInfoMap size: {}, partitionPathToInsertBucketInfos size: {}, updateLocationToBucket size: {}", + totalBuckets, bucketInfoMap.size(), partitionPathToInsertBucketInfos.size(), updateLocationToBucket.size()); if (LOG.isDebugEnabled()) { LOG.debug("Buckets info => " + bucketInfoMap + ", \n" + "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n" @@ -189,6 +190,7 @@ public class UpsertPartitioner<T> extends SparkHoodiePartitioner<T> { this.smallFiles.addAll(smallFiles); + LOG.info("For partitionPath : " + partitionPath + " Total Small Files => " + smallFiles.size()); LOG.debug("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); long totalUnassignedInserts = pStat.getNumInserts(); @@ -230,7 +232,7 @@ public class UpsertPartitioner<T> extends SparkHoodiePartitioner<T> { } int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); - LOG.debug("After small file assignment: unassignedInserts => " + totalUnassignedInserts + LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); @@ -258,7 +260,7 @@ public class UpsertPartitioner<T> extends SparkHoodiePartitioner<T> { currentCumulativeWeight += bkt.weight; insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, currentCumulativeWeight)); } - LOG.debug("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); + LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets); } if (profile.hasOutputWorkLoadStats()) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 7a536da6198..b4279d8451c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -199,6 +199,7 @@ public class HoodieMetadataTableValidator implements Serializable { private String generateValidationTaskLabels() { List<String> labelList = new ArrayList<>(); + labelList.add(cfg.basePath); if (cfg.validateLatestBaseFiles) { labelList.add("validate-latest-base-files"); } @@ -411,10 +412,10 @@ public class HoodieMetadataTableValidator implements Serializable { try { LOG.info(cfg.toString()); if (cfg.continuous) { - LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS mode ******"); + LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS mode - {} ******", taskLabels); doHoodieMetadataTableValidationContinuous(); } else { - LOG.info(" ****** do hoodie metadata table validation once ******"); + LOG.info(" ****** do hoodie metadata table validation once - {} ******", taskLabels); result = doHoodieMetadataTableValidationOnce(); } } catch (Exception e) { @@ -432,7 +433,7 @@ public class HoodieMetadataTableValidator implements Serializable { try { return doMetadataTableValidation(); } catch (Throwable e) { - LOG.error("Metadata table validation failed to HoodieValidationException", e); + LOG.error("Metadata table validation failed to HoodieValidationException {} {}", taskLabels, e); if (!cfg.ignoreFailed) { throw e; } @@ -491,7 +492,7 @@ public class HoodieMetadataTableValidator implements Serializable { List<String> allPartitions = validatePartitions(engineContext, basePath); if (allPartitions.isEmpty()) { - LOG.warn("The result of getting all partitions is null or empty, skip current validation."); + LOG.warn("The result of getting all partitions is null or empty, skip current validation. {}", taskLabels); return true; } @@ -522,7 +523,7 @@ public class HoodieMetadataTableValidator implements Serializable { result.add(Pair.of(true, "")); } catch (HoodieValidationException e) { LOG.error( - "Metadata table validation failed due to HoodieValidationException in record index validation", e); + "Metadata table validation failed due to HoodieValidationException in record index validation for table: {} ", cfg.basePath, e); if (!cfg.ignoreFailed) { throw e; } @@ -563,19 +564,19 @@ public class HoodieMetadataTableValidator implements Serializable { int finishedInstants = mdtMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants(); if (finishedInstants == 0) { if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) { - LOG.info("There is no completed commit in both metadata table and corresponding data table."); + LOG.info("There is no completed commit in both metadata table and corresponding data table: {}", taskLabels); return false; } else { - throw new HoodieValidationException("There is no completed instant for metadata table."); + throw new HoodieValidationException("There is no completed instant for metadata table: " + cfg.basePath); } } return true; } catch (TableNotFoundException tbe) { // Suppress the TableNotFound exception if Metadata table is not available to read for now - LOG.warn("Metadata table is not found. Skip current validation."); + LOG.warn("Metadata table is not found for table: {}. Skip current validation.", cfg.basePath); return false; } catch (Exception ex) { - LOG.warn("Metadata table is not available to read for now, ", ex); + LOG.warn("Metadata table is not available to read for now for table: {}, ", cfg.basePath, ex); return false; } } @@ -622,7 +623,7 @@ public class HoodieMetadataTableValidator implements Serializable { if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size() || !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) { - String message = "Compare Partitions Failed! " + "AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and allPartitionPathsMeta : " + allPartitionPathsMeta; + String message = "Compare Partitions Failed! Table: " + cfg.basePath + ", AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and allPartitionPathsMeta : " + allPartitionPathsMeta; LOG.error(message); throw new HoodieValidationException(message); } @@ -835,13 +836,13 @@ public class HoodieMetadataTableValidator implements Serializable { if (countKeyFromTable != countKeyFromRecordIndex) { String message = String.format("Validation of record index count failed: " - + "%s entries from record index metadata, %s keys from the data table.", + + "%s entries from record index metadata, %s keys from the data table: " + cfg.basePath, countKeyFromRecordIndex, countKeyFromTable); LOG.error(message); throw new HoodieValidationException(message); } else { LOG.info(String.format( - "Validation of record index count succeeded: %s entries.", countKeyFromRecordIndex)); + "Validation of record index count succeeded: %s entries. Table: %s", countKeyFromRecordIndex, cfg.basePath)); } } @@ -950,13 +951,13 @@ public class HoodieMetadataTableValidator implements Serializable { if (diffCount > 0) { String message = String.format("Validation of record index content failed: " + "%s keys (total %s) from the data table have wrong location in record index " - + "metadata. Sample mismatches: %s", - diffCount, countKey, String.join(";", result.getRight())); + + "metadata. Table: %s Sample mismatches: %s", + diffCount, countKey, cfg.basePath, String.join(";", result.getRight())); LOG.error(message); throw new HoodieValidationException(message); } else { LOG.info(String.format( - "Validation of record index content succeeded: %s entries.", countKey)); + "Validation of record index content succeeded: %s entries. Table: %s", countKey, cfg.basePath)); } } @@ -995,13 +996,13 @@ public class HoodieMetadataTableValidator implements Serializable { List<T> infoListFromMetadataTable, List<T> infoListFromFS, String partitionPath, String label) { if (infoListFromMetadataTable.size() != infoListFromFS.size() || !infoListFromMetadataTable.equals(infoListFromFS)) { - String message = String.format("Validation of %s for partition %s failed." + String message = String.format("Validation of %s for partition %s failed for table: %s " + "\n%s from metadata: %s\n%s from file system and base files: %s", - label, partitionPath, label, infoListFromMetadataTable, label, infoListFromFS); + label, partitionPath, cfg.basePath, label, infoListFromMetadataTable, label, infoListFromFS); LOG.error(message); throw new HoodieValidationException(message); } else { - LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath)); + LOG.info(String.format("Validation of %s succeeded for partition %s for table: %s", label, partitionPath, cfg.basePath)); } } @@ -1035,13 +1036,13 @@ public class HoodieMetadataTableValidator implements Serializable { } if (mismatch) { - String message = String.format("Validation of %s for partition %s failed." + String message = String.format("Validation of %s for partition %s failed for table: %s " + "\n%s from metadata: %s\n%s from file system and base files: %s", - label, partitionPath, label, fileSliceListFromMetadataTable, label, fileSliceListFromFS); + label, partitionPath, cfg.basePath, label, fileSliceListFromMetadataTable, label, fileSliceListFromFS); LOG.error(message); throw new HoodieValidationException(message); } else { - LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath)); + LOG.info(String.format("Validation of %s succeeded for partition %s for table: %s ", label, partitionPath, cfg.basePath)); } }
