This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ac23d25 [HUDI-1357] Added a check to validate records are not lost
during merges. (#2216)
ac23d25 is described below
commit ac23d2587f58d2199535dea779925cec02304b2d
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Dec 1 13:44:57 2020 -0800
[HUDI-1357] Added a check to validate records are not lost during merges.
(#2216)
- Turned off by default
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 15 +++++++
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 27 ++++++++++++
.../TestHoodieClientOnCopyOnWriteStorage.java | 49 ++++++++++++++++++++++
.../org/apache/hudi/common/util/ParquetUtils.java | 17 ++++++++
.../hudi/io/storage/HoodieParquetReader.java | 3 +-
.../apache/hudi/common/util/TestParquetUtils.java | 13 ++++++
6 files changed, 122 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 42d3e2b..b06f994 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -117,6 +117,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
public static final String MAX_CONSISTENCY_CHECKS_PROP =
"hoodie.consistency.check.max_checks";
public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+ // Data validation check performed during merges before actual commits
+ private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED =
"hoodie.merge.data.validation.enabled";
+ private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED =
"false";
+
/**
* HUDI-858 : There are users who had been directly using RDD APIs and have
relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a
single commit.
@@ -282,6 +286,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}
+ public boolean isMergeDataValidationCheckEnabled() {
+ return
Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
+ }
+
/**
* compaction properties.
*/
@@ -983,6 +991,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return this;
}
+ public Builder withMergeDataValidationCheckEnabled(boolean enabled) {
+ props.setProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED,
String.valueOf(enabled));
+ return this;
+ }
+
public Builder withProperties(Properties properties) {
this.props.putAll(properties);
return this;
@@ -1032,6 +1045,8 @@ public class HoodieWriteConfig extends
DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE),
AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
+ setDefaultOnCondition(props,
!props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
+ MERGE_DATA_VALIDATION_CHECK_ENABLED,
DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,
HoodieIndexConfig.newBuilder().fromProperties(props).build());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index ad03023..cab7283 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -34,8 +34,11 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.table.HoodieTable;
@@ -292,6 +295,8 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload, I, K, O> extends H
runtimeStats.setTotalUpsertTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);
+ performMergeDataValidationCheck(writeStatus);
+
LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took
%d ms.", stat.getPartitionPath(),
stat.getFileId(), runtimeStats.getTotalUpsertTime()));
@@ -301,6 +306,28 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload, I, K, O> extends H
}
}
+ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
+ if (!config.isMergeDataValidationCheckEnabled()) {
+ return;
+ }
+
+ long oldNumWrites = 0;
+ try {
+ HoodieFileReader reader =
HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
+ oldNumWrites = reader.getTotalRecords();
+ } catch (IOException e) {
+ throw new HoodieUpsertException("Failed to check for merge data
validation", e);
+ }
+
+ if ((writeStatus.getStat().getNumWrites() +
writeStatus.getStat().getNumDeletes()) < oldNumWrites) {
+ throw new HoodieCorruptedDataException(
+ String.format("Record write count decreased for file: %s, Partition
Path: %s (%s:%d + %d < %s:%d)",
+ writeStatus.getFileId(), writeStatus.getPartitionPath(),
+ instantTime, writeStatus.getStat().getNumWrites(),
writeStatus.getStat().getNumDeletes(),
+ FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites));
+ }
+ }
+
public Path getOldFilePath() {
return oldFilePath;
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index bbb4048..d0fda81 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -47,10 +47,12 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
@@ -376,6 +378,53 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
instants.get(3));
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(4));
+
+ final HoodieWriteConfig cfg = hoodieWriteConfig;
+ final String instantTime = "007";
+ HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ String basePathStr = basePath;
+ HoodieTable table = getHoodieTable(metaClient, cfg);
+ jsc.parallelize(Arrays.asList(1)).map(e -> {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(metaClient.getActiveTimeline().getInstantDetails(
+
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
+ HoodieCommitMetadata.class);
+ String filePath =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(w -> w.stream()).filter(s ->
s.getPath().endsWith(".parquet")).findAny()
+ .map(ee -> ee.getPath()).orElse(null);
+ String partitionPath =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(w -> w.stream()).filter(s ->
s.getPath().endsWith(".parquet")).findAny()
+ .map(ee -> ee.getPartitionPath()).orElse(null);
+ Path parquetFilePath = new Path(basePathStr, filePath);
+ HoodieBaseFile baseFile = new HoodieBaseFile(parquetFilePath.toString());
+
+ try {
+ HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime,
table, new HashMap<>(),
+ partitionPath, FSUtils.getFileId(parquetFilePath.getName()),
baseFile, new SparkTaskContextSupplier());
+ WriteStatus writeStatus = new WriteStatus(false, 0.0);
+ writeStatus.setStat(new HoodieWriteStat());
+ writeStatus.getStat().setNumWrites(0);
+ handle.performMergeDataValidationCheck(writeStatus);
+ } catch (HoodieCorruptedDataException e1) {
+ fail("Exception not expected because merge validation check is
disabled");
+ }
+
+ try {
+ final String newInstantTime = "006";
+ cfg.getProps().setProperty("hoodie.merge.data.validation.enabled",
"true");
+ HoodieWriteConfig cfg2 =
HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build();
+ HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime,
table, new HashMap<>(),
+ partitionPath, FSUtils.getFileId(parquetFilePath.getName()),
baseFile, new SparkTaskContextSupplier());
+ WriteStatus writeStatus = new WriteStatus(false, 0.0);
+ writeStatus.setStat(new HoodieWriteStat());
+ writeStatus.getStat().setNumWrites(0);
+ handle.performMergeDataValidationCheck(writeStatus);
+ fail("The above line should have thrown an exception");
+ } catch (HoodieCorruptedDataException e2) {
+ // expected
+ }
+ return true;
+ }).collect();
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index eb5e2b5..dc444aa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -39,6 +39,7 @@ import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
@@ -261,6 +262,22 @@ public class ParquetUtils {
return records;
}
+ /**
+ * Returns the number of records in the parquet file.
+ *
+ * @param conf Configuration
+ * @param parquetFilePath path of the file
+ */
+ public static long getRowCount(Configuration conf, Path parquetFilePath) {
+ ParquetMetadata footer;
+ long rowCount = 0;
+ footer = readMetadata(conf, parquetFilePath);
+ for (BlockMetaData b : footer.getBlocks()) {
+ rowCount += b.getRowCount();
+ }
+ return rowCount;
+ }
+
static class RecordKeysFilterFunction implements Function<String, Boolean> {
private final Set<String> candidateKeys;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
index 107f503..feacbda 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
@@ -74,7 +74,6 @@ public class HoodieParquetReader<R extends IndexedRecord>
implements HoodieFileR
@Override
public long getTotalRecords() {
- // TODO Auto-generated method stub
- return 0;
+ return ParquetUtils.getRowCount(conf, path);
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index 9496f01..2bcbcbd 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -36,6 +36,7 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -147,6 +148,18 @@ public class TestParquetUtils extends
HoodieCommonTestHarness {
}
}
+ @Test
+ public void testReadCounts() throws Exception {
+ String filePath = basePath + "/test.parquet";
+ List<String> rowKeys = new ArrayList<>();
+ for (int i = 0; i < 123; i++) {
+ rowKeys.add(UUID.randomUUID().toString());
+ }
+ writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
+
+ assertEquals(123,
ParquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new
Path(filePath)));
+ }
+
private void writeParquetFile(String typeCode, String filePath, List<String>
rowKeys) throws Exception {
writeParquetFile(typeCode, filePath, rowKeys,
HoodieAvroUtils.getRecordKeySchema(), false, "");
}