danny0405 commented on a change in pull request #5087:
URL: https://github.com/apache/hudi/pull/5087#discussion_r837256589
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -82,33 +78,31 @@ public static FlinkWriteHelper newInstance() {
}
@Override
- protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords,
HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table) {
- return HoodieList.getList(
- table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context,
table));
+ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>>
dedupedRecords, HoodieEngineContext context,
+ HoodieTable<T,
HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>
table) {
+ return table.getIndex().tagLocation(dedupedRecords, context, table);
}
@Override
- public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism)
{
- // If index used is global, then records are expected to differ in their
partitionPath
- Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
- .collect(Collectors.groupingBy(record ->
record.getKey().getRecordKey()));
-
- return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1,
rec2) -> {
+ public HoodieData<HoodieRecord<T>> deduplicateRecords(
+ HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism) {
+ return records.mapToPair(record -> {
+ String key = record.getKey().getRecordKey();
+ return Pair.of(key, record);
+ }).reduceByKey((rec1, rec2) -> {
Review comment:
Can we avoid the generate the intermediate `Pair` objects ?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -136,46 +137,49 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String
instantTime) {
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table =
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
getOrCreateWriteHandle(records.get(0), getConfig(),
instantTime, table, records.listIterator());
- HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>)
table).upsert(context, writeHandle, instantTime, records);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result =
((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime,
HoodieList.of(records));
+ HoodieWriteMetadata<List<WriteStatus>> resultList =
result.clone(HoodieList.getList(result.getWriteStatuses()));
Review comment:
Can we avoid these metadata clone ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]