vinothchandar commented on a change in pull request #1495: [HUDI-770] Organize
upsert/insert API implementation under a single package
URL: https://github.com/apache/incubator-hudi/pull/1495#discussion_r405954276
##########
File path:
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -455,49 +454,15 @@ private void
saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, Hood
}
private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>>
preppedRecords, String instantTime,
- HoodieTable<T> hoodieTable, final boolean isUpsert) {
-
- // Cache the tagged records, so we don't end up computing both
- // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord
storage level handling
- if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) {
- preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
- } else {
- LOG.info("RDD PreppedRecords was persisted at: " +
preppedRecords.getStorageLevel());
- }
-
- WorkloadProfile profile = null;
- if (hoodieTable.isWorkloadProfileNeeded()) {
- profile = new WorkloadProfile(preppedRecords);
- LOG.info("Workload profile :" + profile);
- saveWorkloadProfileMetadataToInflight(profile, hoodieTable, instantTime);
- }
-
- // partition using the insert partitioner
- final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert,
profile);
- JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords,
partitioner);
- JavaRDD<WriteStatus> writeStatusRDD =
partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
- if (isUpsert) {
- return hoodieTable.handleUpsertPartition(instantTime, partition,
recordItr, partitioner);
- } else {
- return hoodieTable.handleInsertPartition(instantTime, partition,
recordItr, partitioner);
- }
- }, true).flatMap(List::iterator);
-
- return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable,
instantTime);
- }
-
- private Partitioner getPartitioner(HoodieTable table, boolean isUpsert,
WorkloadProfile profile) {
- if (isUpsert) {
- return table.getUpsertPartitioner(profile, jsc);
- } else {
- return table.getInsertPartitioner(profile, jsc);
+ HoodieTable<T> hoodieTable) {
+ CommitActionResult result = hoodieTable.ingest(jsc, preppedRecords,
instantTime, getOperationType());
Review comment:
and can we return `HoodieCommitMetadata` instead? I guess the issue is we
have to pass this to the caller (deltastreamer or datasource) to decide whether
to commit or not?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services