This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2112d0ada59cb12b3be6eb7d407ba48b494c505f Author: Bhavani Sudha Saktheeswaran <[email protected]> AuthorDate: Fri Apr 10 08:58:55 2020 -0700 [HUDI - 738] Add validation to DeltaStreamer to fail fast when filterDupes is enabled on UPSERT mode. (#1505) Summary: This fix ensures for UPSERT operation, '--filter-dupes' is disabled and fails fast if not. Otherwise it would drop all updates silently and only take in new records. --- .../org/apache/hudi/utilities/deltastreamer/DeltaSync.java | 5 ----- .../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | 9 ++++----- .../org/apache/hudi/utilities/TestHoodieDeltaStreamer.java | 14 ++++++++++++-- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 3073dfa..4645ade 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -180,9 +180,6 @@ public class DeltaSync implements Serializable { UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider)); this.hiveConf = hiveConf; - if (cfg.filterDupes) { - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - } // If schemaRegistry already resolved, setup write-client setupWriteClient(); @@ -355,8 +352,6 @@ public class DeltaSync implements Serializable { Option<String> scheduledCompactionInstant = Option.empty(); // filter dupes if needed if (cfg.filterDupes) { - // turn upserts to insert - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 01ab1cc..60947be 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -359,16 +359,15 @@ public class HoodieDeltaStreamer implements Serializable { tableType = HoodieTableType.valueOf(cfg.tableType); } + ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT, + "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); + this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); LOG.info("Creating delta streamer with configs : " + props.toString()); this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); - if (cfg.filterDupes) { - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - } - deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType, props, jssc, fs, hiveConf, - this::onInitializingWriteClient); + this::onInitializingWriteClient); } public DeltaSync getDeltaSync() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 9d324dc..1435faa 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -593,7 +593,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Generate the same 1000 records + 1000 new ones for upsert cfg.filterDupes = true; cfg.sourceLimit = 2000; - cfg.operation = Operation.UPSERT; + cfg.operation = Operation.INSERT; new HoodieDeltaStreamer(cfg, jsc).sync(); TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); @@ -606,7 +606,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true); HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT); - cfg2.filterDupes = true; + cfg2.filterDupes = false; cfg2.sourceLimit = 2000; cfg2.operation = Operation.UPSERT; cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); @@ -622,6 +622,16 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { .fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class); System.out.println("New Commit Metadata=" + commitMetadata); assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty()); + + // Try UPSERT with filterDupes true. Expect exception + cfg2.filterDupes = true; + cfg2.operation = Operation.UPSERT; + try { + new HoodieDeltaStreamer(cfg2, jsc).sync(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.")); + } + } @Test
