This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2112d0ada59cb12b3be6eb7d407ba48b494c505f
Author: Bhavani Sudha Saktheeswaran <[email protected]>
AuthorDate: Fri Apr 10 08:58:55 2020 -0700

    [HUDI - 738] Add validation to DeltaStreamer to fail fast when filterDupes 
is enabled on UPSERT mode. (#1505)
    
    Summary:
    This fix ensures for UPSERT operation, '--filter-dupes' is disabled and 
fails fast if not. Otherwise it would drop all updates silently and only take 
in new records.
---
 .../org/apache/hudi/utilities/deltastreamer/DeltaSync.java |  5 -----
 .../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java  |  9 ++++-----
 .../org/apache/hudi/utilities/TestHoodieDeltaStreamer.java | 14 ++++++++++++--
 3 files changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 3073dfa..4645ade 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -180,9 +180,6 @@ public class DeltaSync implements Serializable {
         UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider));
 
     this.hiveConf = hiveConf;
-    if (cfg.filterDupes) {
-      cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
-    }
 
     // If schemaRegistry already resolved, setup write-client
     setupWriteClient();
@@ -355,8 +352,6 @@ public class DeltaSync implements Serializable {
     Option<String> scheduledCompactionInstant = Option.empty();
     // filter dupes if needed
     if (cfg.filterDupes) {
-      // turn upserts to insert
-      cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
       records = DataSourceUtils.dropDuplicates(jssc, records, 
writeClient.getConfig());
     }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 01ab1cc..60947be 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -359,16 +359,15 @@ public class HoodieDeltaStreamer implements Serializable {
         tableType = HoodieTableType.valueOf(cfg.tableType);
       }
 
+      ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != 
Operation.UPSERT,
+          "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to 
ensure updates are not missed.");
+
       this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
       LOG.info("Creating delta streamer with configs : " + props.toString());
       this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
 
-      if (cfg.filterDupes) {
-        cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
-      }
-
       deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType, 
props, jssc, fs, hiveConf,
-          this::onInitializingWriteClient);
+        this::onInitializingWriteClient);
     }
 
     public DeltaSync getDeltaSync() {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 9d324dc..1435faa 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -593,7 +593,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     // Generate the same 1000 records + 1000 new ones for upsert
     cfg.filterDupes = true;
     cfg.sourceLimit = 2000;
-    cfg.operation = Operation.UPSERT;
+    cfg.operation = Operation.INSERT;
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", 
sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
@@ -606,7 +606,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     HoodieTableMetaClient mClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
     HoodieInstant lastFinished = 
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
     HoodieDeltaStreamer.Config cfg2 = 
TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
-    cfg2.filterDupes = true;
+    cfg2.filterDupes = false;
     cfg2.sourceLimit = 2000;
     cfg2.operation = Operation.UPSERT;
     cfg2.configs.add(String.format("%s=false", 
HoodieCompactionConfig.AUTO_CLEAN_PROP));
@@ -622,6 +622,16 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
         
.fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(),
 HoodieCommitMetadata.class);
     System.out.println("New Commit Metadata=" + commitMetadata);
     assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
+
+    // Try UPSERT with filterDupes true. Expect exception
+    cfg2.filterDupes = true;
+    cfg2.operation = Operation.UPSERT;
+    try {
+      new HoodieDeltaStreamer(cfg2, jsc).sync();
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("'--filter-dupes' needs to be 
disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
+    }
+
   }
 
   @Test

Reply via email to