This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 810d65d40a8 [MINOR] Improve mdt validator and upsert partitioner logs
(#10656)
810d65d40a8 is described below
commit 810d65d40a8719790cd33e4eb45565f01b6d1cb4
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Feb 27 22:45:10 2024 +0530
[MINOR] Improve mdt validator and upsert partitioner logs (#10656)
---
.../table/action/commit/UpsertPartitioner.java | 8 ++--
.../utilities/HoodieMetadataTableValidator.java | 43 +++++++++++-----------
2 files changed, 27 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index edd6d981d18..2b78df96765 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -97,7 +97,8 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
assignUpdates(profile);
assignInserts(profile, context);
- LOG.info("Total Buckets: " + totalBuckets);
+ LOG.info("Total Buckets: {}, bucketInfoMap size: {},
partitionPathToInsertBucketInfos size: {}, updateLocationToBucket size: {}",
+ totalBuckets, bucketInfoMap.size(),
partitionPathToInsertBucketInfos.size(), updateLocationToBucket.size());
if (LOG.isDebugEnabled()) {
LOG.debug("Buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " +
partitionPathToInsertBucketInfos + ", \n"
@@ -189,6 +190,7 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
this.smallFiles.addAll(smallFiles);
+ LOG.info("For partitionPath : " + partitionPath + " Total Small Files
=> " + smallFiles.size());
LOG.debug("For partitionPath : " + partitionPath + " Small Files => "
+ smallFiles);
long totalUnassignedInserts = pStat.getNumInserts();
@@ -230,7 +232,7 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
}
int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) /
insertRecordsPerBucket);
- LOG.debug("After small file assignment: unassignedInserts => " +
totalUnassignedInserts
+ LOG.info("After small file assignment: unassignedInserts => " +
totalUnassignedInserts
+ ", totalInsertBuckets => " + insertBuckets + ",
recordsPerBucket => " + insertRecordsPerBucket);
for (int b = 0; b < insertBuckets; b++) {
bucketNumbers.add(totalBuckets);
@@ -258,7 +260,7 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
currentCumulativeWeight += bkt.weight;
insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt,
currentCumulativeWeight));
}
- LOG.debug("Total insert buckets for partition path " + partitionPath +
" => " + insertBuckets);
+ LOG.info("Total insert buckets for partition path " + partitionPath +
" => " + insertBuckets);
partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
}
if (profile.hasOutputWorkLoadStats()) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 4cebbf0b3cc..1e498f7c374 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -199,6 +199,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
private String generateValidationTaskLabels() {
List<String> labelList = new ArrayList<>();
+ labelList.add(cfg.basePath);
if (cfg.validateLatestBaseFiles) {
labelList.add("validate-latest-base-files");
}
@@ -411,10 +412,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
try {
LOG.info(cfg.toString());
if (cfg.continuous) {
- LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS
mode ******");
+ LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS
mode - {} ******", taskLabels);
doHoodieMetadataTableValidationContinuous();
} else {
- LOG.info(" ****** do hoodie metadata table validation once ******");
+ LOG.info(" ****** do hoodie metadata table validation once - {}
******", taskLabels);
result = doHoodieMetadataTableValidationOnce();
}
} catch (Exception e) {
@@ -432,7 +433,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
try {
return doMetadataTableValidation();
} catch (Throwable e) {
- LOG.error("Metadata table validation failed to
HoodieValidationException", e);
+ LOG.error("Metadata table validation failed to HoodieValidationException
{} {}", taskLabels, e);
if (!cfg.ignoreFailed) {
throw e;
}
@@ -491,7 +492,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
List<String> allPartitions = validatePartitions(engineContext, basePath);
if (allPartitions.isEmpty()) {
- LOG.warn("The result of getting all partitions is null or empty, skip
current validation.");
+ LOG.warn("The result of getting all partitions is null or empty, skip
current validation. {}", taskLabels);
return true;
}
@@ -522,7 +523,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
result.add(Pair.of(true, ""));
} catch (HoodieValidationException e) {
LOG.error(
- "Metadata table validation failed due to HoodieValidationException
in record index validation", e);
+ "Metadata table validation failed due to HoodieValidationException
in record index validation for table: {} ", cfg.basePath, e);
if (!cfg.ignoreFailed) {
throw e;
}
@@ -563,19 +564,19 @@ public class HoodieMetadataTableValidator implements
Serializable {
int finishedInstants =
mdtMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
if (finishedInstants == 0) {
if
(metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() ==
0) {
- LOG.info("There is no completed commit in both metadata table and
corresponding data table.");
+ LOG.info("There is no completed commit in both metadata table and
corresponding data table: {}", taskLabels);
return false;
} else {
- throw new HoodieValidationException("There is no completed instant
for metadata table.");
+ throw new HoodieValidationException("There is no completed instant
for metadata table: " + cfg.basePath);
}
}
return true;
} catch (TableNotFoundException tbe) {
// Suppress the TableNotFound exception if Metadata table is not
available to read for now
- LOG.warn("Metadata table is not found. Skip current validation.");
+ LOG.warn("Metadata table is not found for table: {}. Skip current
validation.", cfg.basePath);
return false;
} catch (Exception ex) {
- LOG.warn("Metadata table is not available to read for now, ", ex);
+ LOG.warn("Metadata table is not available to read for now for table: {},
", cfg.basePath, ex);
return false;
}
}
@@ -622,7 +623,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size()
|| !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
- String message = "Compare Partitions Failed! " +
"AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and
allPartitionPathsMeta : " + allPartitionPathsMeta;
+ String message = "Compare Partitions Failed! Table: " + cfg.basePath +
", AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and
allPartitionPathsMeta : " + allPartitionPathsMeta;
LOG.error(message);
throw new HoodieValidationException(message);
}
@@ -835,13 +836,13 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (countKeyFromTable != countKeyFromRecordIndex) {
String message = String.format("Validation of record index count failed:
"
- + "%s entries from record index metadata, %s keys from the data
table.",
+ + "%s entries from record index metadata, %s keys from the data
table: " + cfg.basePath,
countKeyFromRecordIndex, countKeyFromTable);
LOG.error(message);
throw new HoodieValidationException(message);
} else {
LOG.info(String.format(
- "Validation of record index count succeeded: %s entries.",
countKeyFromRecordIndex));
+ "Validation of record index count succeeded: %s entries. Table: %s",
countKeyFromRecordIndex, cfg.basePath));
}
}
@@ -950,13 +951,13 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (diffCount > 0) {
String message = String.format("Validation of record index content
failed: "
+ "%s keys (total %s) from the data table have wrong location in
record index "
- + "metadata. Sample mismatches: %s",
- diffCount, countKey, String.join(";", result.getRight()));
+ + "metadata. Table: %s Sample mismatches: %s",
+ diffCount, countKey, cfg.basePath, String.join(";",
result.getRight()));
LOG.error(message);
throw new HoodieValidationException(message);
} else {
LOG.info(String.format(
- "Validation of record index content succeeded: %s entries.",
countKey));
+ "Validation of record index content succeeded: %s entries. Table:
%s", countKey, cfg.basePath));
}
}
@@ -995,13 +996,13 @@ public class HoodieMetadataTableValidator implements
Serializable {
List<T> infoListFromMetadataTable, List<T> infoListFromFS, String
partitionPath, String label) {
if (infoListFromMetadataTable.size() != infoListFromFS.size()
|| !infoListFromMetadataTable.equals(infoListFromFS)) {
- String message = String.format("Validation of %s for partition %s
failed."
+ String message = String.format("Validation of %s for partition %s failed
for table: %s "
+ "\n%s from metadata: %s\n%s from file system and base files:
%s",
- label, partitionPath, label, infoListFromMetadataTable, label,
infoListFromFS);
+ label, partitionPath, cfg.basePath, label,
infoListFromMetadataTable, label, infoListFromFS);
LOG.error(message);
throw new HoodieValidationException(message);
} else {
- LOG.info(String.format("Validation of %s succeeded for partition %s",
label, partitionPath));
+ LOG.info(String.format("Validation of %s succeeded for partition %s for
table: %s", label, partitionPath, cfg.basePath));
}
}
@@ -1035,13 +1036,13 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
if (mismatch) {
- String message = String.format("Validation of %s for partition %s
failed."
+ String message = String.format("Validation of %s for partition %s failed
for table: %s "
+ "\n%s from metadata: %s\n%s from file system and base files:
%s",
- label, partitionPath, label, fileSliceListFromMetadataTable, label,
fileSliceListFromFS);
+ label, partitionPath, cfg.basePath, label,
fileSliceListFromMetadataTable, label, fileSliceListFromFS);
LOG.error(message);
throw new HoodieValidationException(message);
} else {
- LOG.info(String.format("Validation of %s succeeded for partition %s",
label, partitionPath));
+ LOG.info(String.format("Validation of %s succeeded for partition %s for
table: %s ", label, partitionPath, cfg.basePath));
}
}