This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6746f44ba1893c82c9793486600aa847adc9171a
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Feb 27 22:45:10 2024 +0530

    [MINOR] Improve mdt validator and upsert partitioner logs (#10656)
---
 .../table/action/commit/UpsertPartitioner.java     |  8 ++--
 .../utilities/HoodieMetadataTableValidator.java    | 43 +++++++++++-----------
 2 files changed, 27 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index edd6d981d18..2b78df96765 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -97,7 +97,8 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
     assignUpdates(profile);
     assignInserts(profile, context);
 
-    LOG.info("Total Buckets: " + totalBuckets);
+    LOG.info("Total Buckets: {}, bucketInfoMap size: {}, 
partitionPathToInsertBucketInfos size: {}, updateLocationToBucket size: {}",
+        totalBuckets, bucketInfoMap.size(), 
partitionPathToInsertBucketInfos.size(), updateLocationToBucket.size());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Buckets info => " + bucketInfoMap + ", \n"
               + "Partition to insert buckets => " + 
partitionPathToInsertBucketInfos + ", \n"
@@ -189,6 +190,7 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
 
         this.smallFiles.addAll(smallFiles);
 
+        LOG.info("For partitionPath : " + partitionPath + " Total Small Files 
=> " + smallFiles.size());
         LOG.debug("For partitionPath : " + partitionPath + " Small Files => " 
+ smallFiles);
 
         long totalUnassignedInserts = pStat.getNumInserts();
@@ -230,7 +232,7 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
           }
 
           int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / 
insertRecordsPerBucket);
-          LOG.debug("After small file assignment: unassignedInserts => " + 
totalUnassignedInserts
+          LOG.info("After small file assignment: unassignedInserts => " + 
totalUnassignedInserts
               + ", totalInsertBuckets => " + insertBuckets + ", 
recordsPerBucket => " + insertRecordsPerBucket);
           for (int b = 0; b < insertBuckets; b++) {
             bucketNumbers.add(totalBuckets);
@@ -258,7 +260,7 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
           currentCumulativeWeight += bkt.weight;
           insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, 
currentCumulativeWeight));
         }
-        LOG.debug("Total insert buckets for partition path " + partitionPath + 
" => " + insertBuckets);
+        LOG.info("Total insert buckets for partition path " + partitionPath + 
" => " + insertBuckets);
         partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
       }
       if (profile.hasOutputWorkLoadStats()) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 7a536da6198..b4279d8451c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -199,6 +199,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
   private String generateValidationTaskLabels() {
     List<String> labelList = new ArrayList<>();
+    labelList.add(cfg.basePath);
     if (cfg.validateLatestBaseFiles) {
       labelList.add("validate-latest-base-files");
     }
@@ -411,10 +412,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     try {
       LOG.info(cfg.toString());
       if (cfg.continuous) {
-        LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS 
mode ******");
+        LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS 
mode - {} ******", taskLabels);
         doHoodieMetadataTableValidationContinuous();
       } else {
-        LOG.info(" ****** do hoodie metadata table validation once ******");
+        LOG.info(" ****** do hoodie metadata table validation once - {} 
******", taskLabels);
         result = doHoodieMetadataTableValidationOnce();
       }
     } catch (Exception e) {
@@ -432,7 +433,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     try {
       return doMetadataTableValidation();
     } catch (Throwable e) {
-      LOG.error("Metadata table validation failed to 
HoodieValidationException", e);
+      LOG.error("Metadata table validation failed to HoodieValidationException 
{} {}", taskLabels, e);
       if (!cfg.ignoreFailed) {
         throw e;
       }
@@ -491,7 +492,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     List<String> allPartitions = validatePartitions(engineContext, basePath);
 
     if (allPartitions.isEmpty()) {
-      LOG.warn("The result of getting all partitions is null or empty, skip 
current validation.");
+      LOG.warn("The result of getting all partitions is null or empty, skip 
current validation. {}", taskLabels);
       return true;
     }
 
@@ -522,7 +523,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         result.add(Pair.of(true, ""));
       } catch (HoodieValidationException e) {
         LOG.error(
-            "Metadata table validation failed due to HoodieValidationException 
in record index validation", e);
+            "Metadata table validation failed due to HoodieValidationException 
in record index validation for table: {} ", cfg.basePath, e);
         if (!cfg.ignoreFailed) {
           throw e;
         }
@@ -563,19 +564,19 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       int finishedInstants = 
mdtMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
       if (finishedInstants == 0) {
         if 
(metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 
0) {
-          LOG.info("There is no completed commit in both metadata table and 
corresponding data table.");
+          LOG.info("There is no completed commit in both metadata table and 
corresponding data table: {}", taskLabels);
           return false;
         } else {
-          throw new HoodieValidationException("There is no completed instant 
for metadata table.");
+          throw new HoodieValidationException("There is no completed instant 
for metadata table: " + cfg.basePath);
         }
       }
       return true;
     } catch (TableNotFoundException tbe) {
       // Suppress the TableNotFound exception if Metadata table is not 
available to read for now
-      LOG.warn("Metadata table is not found. Skip current validation.");
+      LOG.warn("Metadata table is not found for table: {}. Skip current 
validation.", cfg.basePath);
       return false;
     } catch (Exception ex) {
-      LOG.warn("Metadata table is not available to read for now, ", ex);
+      LOG.warn("Metadata table is not available to read for now for table: {}, 
", cfg.basePath, ex);
       return false;
     }
   }
@@ -622,7 +623,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size()
         || !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
-      String message = "Compare Partitions Failed! " + 
"AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and 
allPartitionPathsMeta : " + allPartitionPathsMeta;
+      String message = "Compare Partitions Failed! Table: " + cfg.basePath + 
", AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and 
allPartitionPathsMeta : " + allPartitionPathsMeta;
       LOG.error(message);
       throw new HoodieValidationException(message);
     }
@@ -835,13 +836,13 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     if (countKeyFromTable != countKeyFromRecordIndex) {
       String message = String.format("Validation of record index count failed: 
"
-              + "%s entries from record index metadata, %s keys from the data 
table.",
+              + "%s entries from record index metadata, %s keys from the data 
table: " + cfg.basePath,
           countKeyFromRecordIndex, countKeyFromTable);
       LOG.error(message);
       throw new HoodieValidationException(message);
     } else {
       LOG.info(String.format(
-          "Validation of record index count succeeded: %s entries.", 
countKeyFromRecordIndex));
+          "Validation of record index count succeeded: %s entries. Table: %s", 
countKeyFromRecordIndex, cfg.basePath));
     }
   }
 
@@ -950,13 +951,13 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     if (diffCount > 0) {
       String message = String.format("Validation of record index content 
failed: "
               + "%s keys (total %s) from the data table have wrong location in 
record index "
-              + "metadata. Sample mismatches: %s",
-          diffCount, countKey, String.join(";", result.getRight()));
+              + "metadata. Table: %s   Sample mismatches: %s",
+          diffCount, countKey, cfg.basePath, String.join(";", 
result.getRight()));
       LOG.error(message);
       throw new HoodieValidationException(message);
     } else {
       LOG.info(String.format(
-          "Validation of record index content succeeded: %s entries.", 
countKey));
+          "Validation of record index content succeeded: %s entries. Table: 
%s", countKey, cfg.basePath));
     }
   }
 
@@ -995,13 +996,13 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       List<T> infoListFromMetadataTable, List<T> infoListFromFS, String 
partitionPath, String label) {
     if (infoListFromMetadataTable.size() != infoListFromFS.size()
         || !infoListFromMetadataTable.equals(infoListFromFS)) {
-      String message = String.format("Validation of %s for partition %s 
failed."
+      String message = String.format("Validation of %s for partition %s failed 
for table: %s "
               + "\n%s from metadata: %s\n%s from file system and base files: 
%s",
-          label, partitionPath, label, infoListFromMetadataTable, label, 
infoListFromFS);
+          label, partitionPath, cfg.basePath, label, 
infoListFromMetadataTable, label, infoListFromFS);
       LOG.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info(String.format("Validation of %s succeeded for partition %s", 
label, partitionPath));
+      LOG.info(String.format("Validation of %s succeeded for partition %s for 
table: %s", label, partitionPath, cfg.basePath));
     }
   }
 
@@ -1035,13 +1036,13 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     }
 
     if (mismatch) {
-      String message = String.format("Validation of %s for partition %s 
failed."
+      String message = String.format("Validation of %s for partition %s failed 
for table: %s "
               + "\n%s from metadata: %s\n%s from file system and base files: 
%s",
-          label, partitionPath, label, fileSliceListFromMetadataTable, label, 
fileSliceListFromFS);
+          label, partitionPath, cfg.basePath, label, 
fileSliceListFromMetadataTable, label, fileSliceListFromFS);
       LOG.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info(String.format("Validation of %s succeeded for partition %s", 
label, partitionPath));
+      LOG.info(String.format("Validation of %s succeeded for partition %s for 
table: %s ", label, partitionPath, cfg.basePath));
     }
   }
 

Reply via email to