This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8b61696f158 [HUDI-7556] Fixing MDT validator and adding tests (#10939)
8b61696f158 is described below
commit 8b61696f158ff2a90f852f779acb17d515b8b6d1
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Apr 4 21:34:57 2024 -0700
[HUDI-7556] Fixing MDT validator and adding tests (#10939)
---
.../utilities/HoodieMetadataTableValidator.java | 41 +++++++++-
.../TestHoodieMetadataTableValidator.java | 90 ++++++++++++++++++++++
2 files changed, 128 insertions(+), 3 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 1e498f7c374..1bf7e28dfa2 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -103,6 +103,7 @@ import static
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static
org.apache.hudi.hadoop.fs.CachingPath.getPathWithoutSchemeAndAuthority;
@@ -623,9 +624,43 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size()
|| !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
- String message = "Compare Partitions Failed! Table: " + cfg.basePath +
", AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and
allPartitionPathsMeta : " + allPartitionPathsMeta;
- LOG.error(message);
- throw new HoodieValidationException(message);
+ List<String> additionalFromFS = new ArrayList<>(allPartitionPathsFromFS);
+ additionalFromFS.remove(allPartitionPathsMeta);
+ List<String> additionalFromMDT = new ArrayList<>(allPartitionPathsMeta);
+ additionalFromMDT.remove(allPartitionPathsFromFS);
+ boolean misMatch = true;
+ List<String> actualAdditionalPartitionsInMDT = new
ArrayList<>(additionalFromMDT);
+ if (additionalFromFS.isEmpty() && !additionalFromMDT.isEmpty()) {
+ // there is a chance that when we polled MDT there could have been a
new completed commit which was not complete when we polled FS based
+ // listing. let's rule that out.
+ additionalFromMDT.forEach(partitionFromDMT -> {
+
+ HoodiePartitionMetadata hoodiePartitionMetadata =
+ new HoodiePartitionMetadata(metaClient.getFs(),
FSUtils.getPartitionPath(basePath, partitionFromDMT));
+ Option<String> partitionCreationTimeOpt =
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+ // if creation time is greater than last completed instant in active
timeline, we can ignore the additional partition from MDT.
+ if (partitionCreationTimeOpt.isPresent() &&
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
+ Option<HoodieInstant> lastInstant =
completedTimeline.lastInstant();
+ if (lastInstant.isPresent()
+ &&
HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN,
lastInstant.get().getTimestamp())) {
+ LOG.warn("Ignoring additional partition " + partitionFromDMT +
", as it was deduced to be part of a "
+ + "latest completed commit which was inflighht when FS based
listing was polled.");
+ actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
+ }
+ }
+ });
+ // if there is no additional partitions from FS listing and only
additional partitions from MDT based listing is due to a new commit, we are good
+ if (actualAdditionalPartitionsInMDT.isEmpty()) {
+ misMatch = false;
+ }
+ }
+ if (misMatch) {
+ String message = "Compare Partitions Failed! " + " Additional
partitions from FS, but missing from MDT : \"" + additionalFromFS
+ + "\" and additional partitions from MDT, but missing from FS
listing : \"" + actualAdditionalPartitionsInMDT
+ + "\".\n All partitions from FS listing " +
allPartitionPathsFromFS;
+ LOG.error(message);
+ throw new HoodieValidationException(message);
+ }
}
return allPartitionPathsMeta;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
new file mode 100644
index 00000000000..74642bbcb7a
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase {
+
+ @Test
+ public void testMetadataTableValidation() {
+
+ Map<String,String> writeOptions = new HashMap<>();
+ writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+ writeOptions.put("hoodie.table.name", "test_table");
+ writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
+ writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
+ writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
+ writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
"partition_path");
+
+ Dataset<Row> inserts = makeInsertDf("000", 5).cache();
+ inserts.write().format("hudi").options(writeOptions)
+ .option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.BULK_INSERT.value())
+ .mode(SaveMode.Overwrite)
+ .save(basePath);
+ Dataset<Row> updates = makeUpdateDf("001", 5).cache();
+ updates.write().format("hudi").options(writeOptions)
+ .option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.UPSERT.value())
+ .mode(SaveMode.Append)
+ .save(basePath);
+
+ // validate MDT
+ HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
+ config.basePath = basePath;
+ config.validateLatestFileSlices = true;
+ config.validateAllFileGroups = true;
+ HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
+ assertTrue(validator.run());
+ }
+
+ protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
+ List<String> records = dataGen.generateInserts(instantTime, n).stream()
+ .map(r -> recordToString(r).get()).collect(Collectors.toList());
+ JavaRDD<String> rdd = jsc.parallelize(records);
+ return sparkSession.read().json(rdd);
+ }
+
+ protected Dataset<Row> makeUpdateDf(String instantTime, Integer n) {
+ try {
+ List<String> records = dataGen.generateUpdates(instantTime, n).stream()
+ .map(r -> recordToString(r).get()).collect(Collectors.toList());
+ JavaRDD<String> rdd = jsc.parallelize(records);
+ return sparkSession.read().json(rdd);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}