linliu-code commented on code in PR #12588:
URL: https://github.com/apache/hudi/pull/12588#discussion_r1915456963


##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -284,24 +285,47 @@ public static HoodieRecord 
createHoodieRecord(GenericRecord gr, HoodieKey hKey,
    * @param writeConfig HoodieWriteConfig
    */
   @SuppressWarnings("unchecked")
-  public static JavaRDD<HoodieRecord> dropDuplicates(HoodieSparkEngineContext 
engineContext, JavaRDD<HoodieRecord> incomingHoodieRecords,
-      HoodieWriteConfig writeConfig) {
+  public static JavaRDD<HoodieRecord> 
doDropDuplicates(HoodieSparkEngineContext engineContext,
+                                                       JavaRDD<HoodieRecord> 
incomingHoodieRecords,
+                                                       HoodieWriteConfig 
writeConfig,
+                                                       boolean 
failOnDuplicates) {
     try {
       SparkRDDReadClient client = new SparkRDDReadClient<>(engineContext, 
writeConfig);
       return client.tagLocation(incomingHoodieRecords)
-          .filter(r -> !((HoodieRecord<HoodieRecordPayload>) 
r).isCurrentLocationKnown());
+          .filter(r -> shouldIncludeRecord((HoodieRecord<HoodieRecordPayload>) 
r, failOnDuplicates));
     } catch (TableNotFoundException e) {
-      // this will be executed when there is no hoodie table yet
-      // so no dups to drop
+      // No table exists yet, so no duplicates to drop
       return incomingHoodieRecords;
     }
   }
 
+  /**
+   * Determines if a record should be included in the result after 
deduplication.
+   *
+   * @param record            The Hoodie record to evaluate.
+   * @param failOnDuplicates  Whether to fail on detecting duplicates.
+   * @return true if the record should be included; false otherwise.
+   */
+  private static boolean shouldIncludeRecord(HoodieRecord<?> record, boolean 
failOnDuplicates) {
+    if (!record.isCurrentLocationKnown()) {
+      return true;
+    }
+    if (failOnDuplicates) {
+      // Fail if duplicates are found and the flag is set
+      throw new HoodieDuplicateKeyException(record.getRecordKey());
+    }
+    return false;
+  }
+
   @SuppressWarnings("unchecked")
-  public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, 
JavaRDD<HoodieRecord> incomingHoodieRecords,
-      Map<String, String> parameters) {
-    HoodieWriteConfig writeConfig =
-        
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
-    return dropDuplicates(new HoodieSparkEngineContext(jssc), 
incomingHoodieRecords, writeConfig);
+  public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to