vinothchandar commented on a change in pull request #1495: [HUDI-770] Organize
upsert/insert API implementation under a single package
URL: https://github.com/apache/incubator-hudi/pull/1495#discussion_r405953421
##########
File path:
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -455,49 +454,15 @@ private void
saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, Hood
}
private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>>
preppedRecords, String instantTime,
- HoodieTable<T> hoodieTable, final boolean isUpsert) {
-
- // Cache the tagged records, so we don't end up computing both
- // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord
storage level handling
- if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) {
- preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
- } else {
- LOG.info("RDD PreppedRecords was persisted at: " +
preppedRecords.getStorageLevel());
- }
-
- WorkloadProfile profile = null;
- if (hoodieTable.isWorkloadProfileNeeded()) {
- profile = new WorkloadProfile(preppedRecords);
- LOG.info("Workload profile :" + profile);
- saveWorkloadProfileMetadataToInflight(profile, hoodieTable, instantTime);
- }
-
- // partition using the insert partitioner
- final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert,
profile);
- JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords,
partitioner);
- JavaRDD<WriteStatus> writeStatusRDD =
partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
- if (isUpsert) {
- return hoodieTable.handleUpsertPartition(instantTime, partition,
recordItr, partitioner);
- } else {
- return hoodieTable.handleInsertPartition(instantTime, partition,
recordItr, partitioner);
- }
- }, true).flatMap(List::iterator);
-
- return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable,
instantTime);
- }
-
- private Partitioner getPartitioner(HoodieTable table, boolean isUpsert,
WorkloadProfile profile) {
- if (isUpsert) {
- return table.getUpsertPartitioner(profile, jsc);
- } else {
- return table.getInsertPartitioner(profile, jsc);
+ HoodieTable<T> hoodieTable) {
+ CommitActionResult result = hoodieTable.ingest(jsc, preppedRecords,
instantTime, getOperationType());
Review comment:
IMO we should just mirror the same APIs upsert, insert on the table..
`ingest` is confusing, since it also implies that we are reading from some
source, whihc we are not..
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services