bvaradar commented on a change in pull request #1004: [HUDI-328] Adding delete
api to HoodieWriteClient
URL: https://github.com/apache/incubator-hudi/pull/1004#discussion_r346712893
##########
File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
##########
@@ -325,6 +324,44 @@ public static SparkConf registerClasses(SparkConf conf) {
}
}
+ /**
+ * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the
supplied commitTime {@link HoodieKey}s will be
+ * deduped and non existant keys will be removed before deleting.
+ *
+ * @param keys {@link List} of {@link HoodieKey}s to be deleted
+ * @param commitTime Commit time handle
+ * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and
counts
+ */
+ public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String
commitTime) {
+ HoodieTable<T> table = getTableAndInitCtx();
+ try {
+ // De-dupe/merge if needed
+ JavaRDD<HoodieKey> dedupedKeys =
+ config.shouldCombineBeforeDelete() ? deduplicateKeys(keys,
config.getDeleteShuffleParallelism()) : keys;
+
+ JavaRDD<HoodieRecord<T>> dedupedRecords =
+ dedupedKeys.map(key -> new HoodieRecord(key, new
EmptyHoodieRecordPayload()));
+ indexTimer = metrics.getIndexCtx();
+ // perform index loop up to get existing location of records
+ JavaRDD<HoodieRecord<T>> taggedRecords =
index.tagLocation(dedupedRecords, jsc, table);
+ // filter out non existant keys/records
+ JavaRDD<HoodieRecord<T>> taggedValidRecords =
taggedRecords.filter(record -> record.getCurrentLocation() != null);
+ if (!taggedValidRecords.isEmpty()) {
+ metrics.updateIndexMetrics(LOOKUP_STR,
metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
+ indexTimer = null;
+ return upsertRecordsInternal(taggedValidRecords, commitTime, table,
true);
+ } else {
+ // if entire set of keys are non existent
+ return jsc.emptyRDD();
Review comment:
> Don't you think if all keys are invalid, we should throw an exception
actually?
we should not throw exceptions. For various reasons (e:g checkpoint
rewinding when checkpoints are managed outside Hudi commit files), we might be
processing the same record (key for the case of deletions) more than once and
to be idempotent, we should either commit or silently ignore.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services