This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ac23d25  [HUDI-1357] Added a check to validate records are not lost 
during merges. (#2216)
ac23d25 is described below

commit ac23d2587f58d2199535dea779925cec02304b2d
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Dec 1 13:44:57 2020 -0800

    [HUDI-1357] Added a check to validate records are not lost during merges. 
(#2216)
    
    - Turned off by default
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 15 +++++++
 .../java/org/apache/hudi/io/HoodieMergeHandle.java | 27 ++++++++++++
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 49 ++++++++++++++++++++++
 .../org/apache/hudi/common/util/ParquetUtils.java  | 17 ++++++++
 .../hudi/io/storage/HoodieParquetReader.java       |  3 +-
 .../apache/hudi/common/util/TestParquetUtils.java  | 13 ++++++
 6 files changed, 122 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 42d3e2b..b06f994 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -117,6 +117,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
   public static final String MAX_CONSISTENCY_CHECKS_PROP = 
"hoodie.consistency.check.max_checks";
   public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
 
+  // Data validation check performed during merges before actual commits
+  private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = 
"hoodie.merge.data.validation.enabled";
+  private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = 
"false";
+
   /**
    * HUDI-858 : There are users who had been directly using RDD APIs and have 
relied on a behavior in 0.4.x to allow
    * multiple write operations (upsert/buk-insert/...) to be executed within a 
single commit.
@@ -282,6 +286,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
   }
 
+  public boolean isMergeDataValidationCheckEnabled() {
+    return 
Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
+  }
+
   /**
    * compaction properties.
    */
@@ -983,6 +991,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
       return this;
     }
 
+    public Builder withMergeDataValidationCheckEnabled(boolean enabled) {
+      props.setProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED, 
String.valueOf(enabled));
+      return this;
+    }
+
     public Builder withProperties(Properties properties) {
       this.props.putAll(properties);
       return this;
@@ -1032,6 +1045,8 @@ public class HoodieWriteConfig extends 
DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), 
AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
       setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
           BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
+      setDefaultOnCondition(props, 
!props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
+          MERGE_DATA_VALIDATION_CHECK_ENABLED, 
DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
 
       // Make sure the props is propagated
       setDefaultOnCondition(props, !isIndexConfigSet, 
HoodieIndexConfig.newBuilder().fromProperties(props).build());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index ad03023..cab7283 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -34,8 +34,11 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.table.HoodieTable;
 
@@ -292,6 +295,8 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
       runtimeStats.setTotalUpsertTime(timer.endTimer());
       stat.setRuntimeStats(runtimeStats);
 
+      performMergeDataValidationCheck(writeStatus);
+
       LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took 
%d ms.", stat.getPartitionPath(),
           stat.getFileId(), runtimeStats.getTotalUpsertTime()));
 
@@ -301,6 +306,28 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
     }
   }
 
+  public void performMergeDataValidationCheck(WriteStatus writeStatus) {
+    if (!config.isMergeDataValidationCheckEnabled()) {
+      return;
+    }
+
+    long oldNumWrites = 0;
+    try {
+      HoodieFileReader reader = 
HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
+      oldNumWrites = reader.getTotalRecords();
+    } catch (IOException e) {
+      throw new HoodieUpsertException("Failed to check for merge data 
validation", e);
+    }
+
+    if ((writeStatus.getStat().getNumWrites() + 
writeStatus.getStat().getNumDeletes()) < oldNumWrites) {
+      throw new HoodieCorruptedDataException(
+          String.format("Record write count decreased for file: %s, Partition 
Path: %s (%s:%d + %d < %s:%d)",
+              writeStatus.getFileId(), writeStatus.getPartitionPath(),
+              instantTime, writeStatus.getStat().getNumWrites(), 
writeStatus.getStat().getNumDeletes(),
+              FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites));
+    }
+  }
+
   public Path getOldFilePath() {
     return oldFilePath;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index bbb4048..d0fda81 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -47,10 +47,12 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
@@ -376,6 +378,53 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         instants.get(3));
     assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "006"),
         instants.get(4));
+
+    final HoodieWriteConfig cfg = hoodieWriteConfig;
+    final String instantTime = "007";
+    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    String basePathStr = basePath;
+    HoodieTable table = getHoodieTable(metaClient, cfg);
+    jsc.parallelize(Arrays.asList(1)).map(e -> {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+          .fromBytes(metaClient.getActiveTimeline().getInstantDetails(
+              
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
+              HoodieCommitMetadata.class);
+      String filePath = 
commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(w -> w.stream()).filter(s -> 
s.getPath().endsWith(".parquet")).findAny()
+          .map(ee -> ee.getPath()).orElse(null);
+      String partitionPath = 
commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(w -> w.stream()).filter(s -> 
s.getPath().endsWith(".parquet")).findAny()
+          .map(ee -> ee.getPartitionPath()).orElse(null);
+      Path parquetFilePath = new Path(basePathStr, filePath);
+      HoodieBaseFile baseFile = new HoodieBaseFile(parquetFilePath.toString());
+
+      try {
+        HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime, 
table, new HashMap<>(),
+            partitionPath, FSUtils.getFileId(parquetFilePath.getName()), 
baseFile, new SparkTaskContextSupplier());
+        WriteStatus writeStatus = new WriteStatus(false, 0.0);
+        writeStatus.setStat(new HoodieWriteStat());
+        writeStatus.getStat().setNumWrites(0);
+        handle.performMergeDataValidationCheck(writeStatus);
+      } catch (HoodieCorruptedDataException e1) {
+        fail("Exception not expected because merge validation check is 
disabled");
+      }
+
+      try {
+        final String newInstantTime = "006";
+        cfg.getProps().setProperty("hoodie.merge.data.validation.enabled", 
"true");
+        HoodieWriteConfig cfg2 = 
HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build();
+        HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime, 
table, new HashMap<>(),
+            partitionPath, FSUtils.getFileId(parquetFilePath.getName()), 
baseFile, new SparkTaskContextSupplier());
+        WriteStatus writeStatus = new WriteStatus(false, 0.0);
+        writeStatus.setStat(new HoodieWriteStat());
+        writeStatus.getStat().setNumWrites(0);
+        handle.performMergeDataValidationCheck(writeStatus);
+        fail("The above line should have thrown an exception");
+      } catch (HoodieCorruptedDataException e2) {
+        // expected
+      }
+      return true;
+    }).collect();
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index eb5e2b5..dc444aa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -39,6 +39,7 @@ import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 
@@ -261,6 +262,22 @@ public class ParquetUtils {
     return records;
   }
 
+  /**
+   * Returns the number of records in the parquet file.
+   *
+   * @param conf Configuration
+   * @param parquetFilePath path of the file
+   */
+  public static long getRowCount(Configuration conf, Path parquetFilePath) {
+    ParquetMetadata footer;
+    long rowCount = 0;
+    footer = readMetadata(conf, parquetFilePath);
+    for (BlockMetaData b : footer.getBlocks()) {
+      rowCount += b.getRowCount();
+    }
+    return rowCount;
+  }
+
   static class RecordKeysFilterFunction implements Function<String, Boolean> {
 
     private final Set<String> candidateKeys;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
index 107f503..feacbda 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
@@ -74,7 +74,6 @@ public class HoodieParquetReader<R extends IndexedRecord> 
implements HoodieFileR
 
   @Override
   public long getTotalRecords() {
-    // TODO Auto-generated method stub
-    return 0;
+    return ParquetUtils.getRowCount(conf, path);
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index 9496f01..2bcbcbd 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -36,6 +36,7 @@ import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -147,6 +148,18 @@ public class TestParquetUtils extends 
HoodieCommonTestHarness {
     }
   }
 
+  @Test
+  public void testReadCounts() throws Exception {
+    String filePath = basePath + "/test.parquet";
+    List<String> rowKeys = new ArrayList<>();
+    for (int i = 0; i < 123; i++) {
+      rowKeys.add(UUID.randomUUID().toString());
+    }
+    writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
+
+    assertEquals(123, 
ParquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new 
Path(filePath)));
+  }
+
   private void writeParquetFile(String typeCode, String filePath, List<String> 
rowKeys) throws Exception {
     writeParquetFile(typeCode, filePath, rowKeys, 
HoodieAvroUtils.getRecordKeySchema(), false, "");
   }

Reply via email to