linliu-code commented on code in PR #12588:
URL: https://github.com/apache/hudi/pull/12588#discussion_r1915456963
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -284,24 +285,47 @@ public static HoodieRecord
createHoodieRecord(GenericRecord gr, HoodieKey hKey,
* @param writeConfig HoodieWriteConfig
*/
@SuppressWarnings("unchecked")
- public static JavaRDD<HoodieRecord> dropDuplicates(HoodieSparkEngineContext
engineContext, JavaRDD<HoodieRecord> incomingHoodieRecords,
- HoodieWriteConfig writeConfig) {
+ public static JavaRDD<HoodieRecord>
doDropDuplicates(HoodieSparkEngineContext engineContext,
+ JavaRDD<HoodieRecord>
incomingHoodieRecords,
+ HoodieWriteConfig
writeConfig,
+ boolean
failOnDuplicates) {
try {
SparkRDDReadClient client = new SparkRDDReadClient<>(engineContext,
writeConfig);
return client.tagLocation(incomingHoodieRecords)
- .filter(r -> !((HoodieRecord<HoodieRecordPayload>)
r).isCurrentLocationKnown());
+ .filter(r -> shouldIncludeRecord((HoodieRecord<HoodieRecordPayload>)
r, failOnDuplicates));
} catch (TableNotFoundException e) {
- // this will be executed when there is no hoodie table yet
- // so no dups to drop
+ // No table exists yet, so no duplicates to drop
return incomingHoodieRecords;
}
}
+ /**
+ * Determines if a record should be included in the result after
deduplication.
+ *
+ * @param record The Hoodie record to evaluate.
+ * @param failOnDuplicates Whether to fail on detecting duplicates.
+ * @return true if the record should be included; false otherwise.
+ */
+ private static boolean shouldIncludeRecord(HoodieRecord<?> record, boolean
failOnDuplicates) {
+ if (!record.isCurrentLocationKnown()) {
+ return true;
+ }
+ if (failOnDuplicates) {
+ // Fail if duplicates are found and the flag is set
+ throw new HoodieDuplicateKeyException(record.getRecordKey());
+ }
+ return false;
+ }
+
@SuppressWarnings("unchecked")
- public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords,
- Map<String, String> parameters) {
- HoodieWriteConfig writeConfig =
-
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
- return dropDuplicates(new HoodieSparkEngineContext(jssc),
incomingHoodieRecords, writeConfig);
+ public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]