This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b61696f158 [HUDI-7556] Fixing MDT validator and adding tests (#10939)
8b61696f158 is described below

commit 8b61696f158ff2a90f852f779acb17d515b8b6d1
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Apr 4 21:34:57 2024 -0700

    [HUDI-7556] Fixing MDT validator and adding tests (#10939)
---
 .../utilities/HoodieMetadataTableValidator.java    | 41 +++++++++-
 .../TestHoodieMetadataTableValidator.java          | 90 ++++++++++++++++++++++
 2 files changed, 128 insertions(+), 3 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 1e498f7c374..1bf7e28dfa2 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -103,6 +103,7 @@ import static 
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static 
org.apache.hudi.hadoop.fs.CachingPath.getPathWithoutSchemeAndAuthority;
@@ -623,9 +624,43 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size()
         || !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
-      String message = "Compare Partitions Failed! Table: " + cfg.basePath + 
", AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and 
allPartitionPathsMeta : " + allPartitionPathsMeta;
-      LOG.error(message);
-      throw new HoodieValidationException(message);
+      List<String> additionalFromFS = new ArrayList<>(allPartitionPathsFromFS);
+      additionalFromFS.remove(allPartitionPathsMeta);
+      List<String> additionalFromMDT = new ArrayList<>(allPartitionPathsMeta);
+      additionalFromMDT.remove(allPartitionPathsFromFS);
+      boolean misMatch = true;
+      List<String> actualAdditionalPartitionsInMDT = new 
ArrayList<>(additionalFromMDT);
+      if (additionalFromFS.isEmpty() && !additionalFromMDT.isEmpty()) {
+        // there is a chance that when we polled MDT there could have been a 
new completed commit which was not complete when we polled FS based
+        // listing. let's rule that out.
+        additionalFromMDT.forEach(partitionFromDMT -> {
+
+          HoodiePartitionMetadata hoodiePartitionMetadata =
+              new HoodiePartitionMetadata(metaClient.getFs(), 
FSUtils.getPartitionPath(basePath, partitionFromDMT));
+          Option<String> partitionCreationTimeOpt = 
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+          // if creation time is greater than last completed instant in active 
timeline, we can ignore the additional partition from MDT.
+          if (partitionCreationTimeOpt.isPresent() && 
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
+            Option<HoodieInstant> lastInstant = 
completedTimeline.lastInstant();
+            if (lastInstant.isPresent()
+                && 
HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN, 
lastInstant.get().getTimestamp())) {
+              LOG.warn("Ignoring additional partition " + partitionFromDMT + 
", as it was deduced to be part of a "
+                  + "latest completed commit which was inflighht when FS based 
listing was polled.");
+              actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
+            }
+          }
+        });
+        // if there is no additional partitions from FS listing and only 
additional partitions from MDT based listing is due to a new commit, we are good
+        if (actualAdditionalPartitionsInMDT.isEmpty()) {
+          misMatch = false;
+        }
+      }
+      if (misMatch) {
+        String message = "Compare Partitions Failed! " + " Additional 
partitions from FS, but missing from MDT : \"" + additionalFromFS
+            + "\" and additional partitions from MDT, but missing from FS 
listing : \"" + actualAdditionalPartitionsInMDT
+            + "\".\n All partitions from FS listing " + 
allPartitionPathsFromFS;
+        LOG.error(message);
+        throw new HoodieValidationException(message);
+      }
     }
 
     return allPartitionPathsMeta;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
new file mode 100644
index 00000000000..74642bbcb7a
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase {
+
+  @Test
+  public void testMetadataTableValidation() {
+
+    Map<String,String> writeOptions = new HashMap<>();
+    writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+    writeOptions.put("hoodie.table.name", "test_table");
+    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
+    writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
+    writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
+    writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition_path");
+
+    Dataset<Row> inserts = makeInsertDf("000", 5).cache();
+    inserts.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.BULK_INSERT.value())
+        .mode(SaveMode.Overwrite)
+        .save(basePath);
+    Dataset<Row> updates = makeUpdateDf("001", 5).cache();
+    updates.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.UPSERT.value())
+        .mode(SaveMode.Append)
+        .save(basePath);
+
+    // validate MDT
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = basePath;
+    config.validateLatestFileSlices = true;
+    config.validateAllFileGroups = true;
+    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
+    assertTrue(validator.run());
+  }
+
+  protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
+    List<String> records = dataGen.generateInserts(instantTime, n).stream()
+        .map(r -> recordToString(r).get()).collect(Collectors.toList());
+    JavaRDD<String> rdd = jsc.parallelize(records);
+    return sparkSession.read().json(rdd);
+  }
+
+  protected Dataset<Row> makeUpdateDf(String instantTime, Integer n) {
+    try {
+      List<String> records = dataGen.generateUpdates(instantTime, n).stream()
+          .map(r -> recordToString(r).get()).collect(Collectors.toList());
+      JavaRDD<String> rdd = jsc.parallelize(records);
+      return sparkSession.read().json(rdd);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Reply via email to