xushiyan commented on code in PR #5627:
URL: https://github.com/apache/hudi/pull/5627#discussion_r901041090
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -81,9 +83,10 @@ public I combineOnCondition(
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism) {
- return deduplicateRecords(records, table.getIndex(), parallelism);
+ HoodieMerge hoodieMerge =
ReflectionUtils.loadHoodieMerge(table.getConfig().getMergeClass());
Review Comment:
Ditto
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -103,6 +105,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config,
String instantTime, String
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
schemaOnReadEnabled =
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+ hoodieMerge = ReflectionUtils.loadHoodieMerge(config.getMergeClass());
Review Comment:
Reflection impacts performance greatly. Can we optimize the instantiation
here?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java:
##########
@@ -49,19 +50,17 @@ protected HoodieData<HoodieRecord<T>>
tag(HoodieData<HoodieRecord<T>> dedupedRec
@Override
public HoodieData<HoodieRecord<T>> deduplicateRecords(
- HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism) {
+ HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism, HoodieMerge hoodieMerge) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
- // If index used is global, then records are expected to differ in their
partitionPath
+ // If index used is global,x then records are expected to differ in
their partitionPath
Review Comment:
Typo?
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java:
##########
@@ -87,20 +88,21 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>>
dedupedRecords, Hoodie
@Override
public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism)
{
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism,
HoodieMerge hoodieMerge) {
// If index used is global, then records are expected to differ in their
partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record ->
record.getKey().getRecordKey()));
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1,
rec2) -> {
- @SuppressWarnings("unchecked") final HoodieRecord reducedRec =
rec2.preCombine(rec1);
+ @SuppressWarnings("unchecked")
+ final HoodieRecord reducedData = hoodieMerge.preCombine(rec1, rec2);
// we cannot allow the user to change the key or partitionPath, since
that will affect
// everything
// so pick it from one of the records.
- boolean choosePrev = rec1 == reducedRec;
+ boolean choosePrev = rec1 == reducedData;
Review Comment:
reducedRec sounds more precise. Why rename?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -154,6 +155,12 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Payload class to use for performing compactions, i.e
merge delta logs with current base file and then "
+ " produce a new base file.");
+ public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
Review Comment:
This duplicates the other config?
##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java:
##########
@@ -65,11 +67,12 @@ public List<HoodieRecord<T>> deduplicateRecords(
return keyedRecords.values().stream().map(x ->
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
@SuppressWarnings("unchecked")
- HoodieRecord reducedRec = rec2.preCombine(rec1);
+ HoodieRecord reducedRecord = hoodieMerge.preCombine(rec1,rec2);
+
// we cannot allow the user to change the key or partitionPath, since
that will affect
// everything
// so pick it from one of the records.
- return (HoodieRecord<T>) reducedRec.newInstance(rec1.getKey());
+ return (HoodieRecord<T>) new HoodieAvroRecord(reducedRecord);
Review Comment:
Why not using the newInstance API?
##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -181,6 +181,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
case b: ByteBuffer =>
val bytes = new Array[Byte](b.remaining)
b.get(bytes)
+ // Do not forget to reset the position
+ b.rewind()
Review Comment:
An existing bug? Maybe file a separate jira and land in master first?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]