alexeykudinkin commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r991697426
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -267,9 +268,8 @@ protected void init(String fileId,
Iterator<HoodieRecord<T>> newRecordsItr) {
+ ((ExternalSpillableMap)
keyToNewRecords).getSizeOfFileOnDiskInBytes());
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp) throws
IOException {
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema) throws IOException {
Review Comment:
@wzx140 this API is rather confusing: why do we need to pass all 3 records
at the same time?
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java:
##########
@@ -59,20 +61,32 @@ public
FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, String
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(),
isDelete ? Option.empty() : combineRecordOp.map(rec ->
((HoodieAvroIndexedRecord) rec).getData()));
+ Option<IndexedRecord> combineRecord;
Review Comment:
@xushiyan this is an identical copy of it, combining all of that in
`HoodieWriteHandle` would allow us to avoid that duplication
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -118,76 +120,80 @@ public HoodieRecordType getRecordType() {
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
- return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
getStructType(), consistentLogicalTimestampEnabled);
+ public ComparableList getComparableColumnValues(Schema recordSchema,
String[] columns, boolean consistentLogicalTimestampEnabled) {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
structType, consistentLogicalTimestampEnabled);
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema)
throws IOException {
- StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
- StructType writerStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data,
getStructType(), (InternalRow) other.getData(), otherStructType,
writerStructType);
- return new HoodieSparkRecord(getKey(), mergeRow, writerStructType,
getOperation());
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws
IOException {
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
+ return new HoodieSparkRecord(getKey(), mergeRow, targetStructType,
getOperation());
}
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
UTF8String[] metaFields = extractMetaField(targetStructType);
Review Comment:
We can't fetch in the new schema until we rewritten the record (we should
use previous schema here)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -86,13 +87,20 @@ public ClosableIterator<InternalRow>
getInternalRowIterator(Schema readerSchema,
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), (Boolean)
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()));
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), (Boolean)
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()));
InputFile inputFile = HadoopInputFile.fromPath(path, conf);
- ParquetReader reader = new ParquetReader.Builder<InternalRow>(inputFile) {
+ ParquetReader<InternalRow> reader = new
ParquetReader.Builder<InternalRow>(inputFile) {
@Override
protected ReadSupport getReadSupport() {
return new ParquetReadSupport();
}
}.withConf(conf).build();
- ParquetReaderIterator<InternalRow> parquetReaderIterator = new
ParquetReaderIterator<>(reader, InternalRow::copy);
+ ParquetReaderIterator<InternalRow> parquetReaderIterator = new
ParquetReaderIterator<>(reader,
Review Comment:
We should not be making copies in a blanket fashion like that. Why do we
need this?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -37,12 +38,14 @@
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public interface HoodieRecordMerger extends Serializable {
+ String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
+
/**
* This method converges combineAndGetUpdateValue and precombine from
HoodiePayload.
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we
can translate as having 3 versions A, B, C
* of the single record, both orders of operations applications have to
yield the same result)
*/
- Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException;
+ Pair<Option<HoodieRecord>, Schema> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws
IOException;
Review Comment:
I think we should invert this to be `Option<Pair<HoodieRecord, Schema>>`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +75,31 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord)
oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec ->
((HoodieAvroIndexedRecord) rec).getData()));
+ Option<IndexedRecord> combineRecord;
+ if (combineRecordOp.isPresent()) {
Review Comment:
We can simplify this conditional with just `map` (even though `toIndexed`
could throw)
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java:
##########
@@ -62,20 +64,32 @@ public FlinkMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTim
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(),
isDelete ? Option.empty() : combineRecordOp.map(rec ->
((HoodieAvroIndexedRecord) rec).getData()));
+ Option<IndexedRecord> combineRecord;
+ if (combineRecordOp.isPresent()) {
+ combineRecord =
savedCombineRecordOp.get().toIndexedRecord(writerSchema,
config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData);
+ } else {
+ combineRecord = Option.empty();
+ }
+ cdcLogger.put(hoodieRecord, (GenericRecord) savedOldRecord.getData(),
isDelete ? Option.empty() : combineRecord);
+ hoodieRecord.deflate();
}
return result;
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws
IOException {
- super.writeInsertRecord(hoodieRecord);
+ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Schema
schema) throws IOException {
Review Comment:
@xushiyan same here
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +75,31 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord)
oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec ->
((HoodieAvroIndexedRecord) rec).getData()));
+ Option<IndexedRecord> combineRecord;
+ if (combineRecordOp.isPresent()) {
+ combineRecord =
savedCombineRecordOp.get().toIndexedRecord(writerSchema,
config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData);
+ } else {
+ combineRecord = Option.empty();
+ }
+ cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord)
savedOldRecord).getData(), isDelete ? Option.empty() : combineRecord);
+ hoodieRecord.deflate();
}
return result;
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws
IOException {
- // Get the data before deflated
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields :
tableSchema;
- Option<IndexedRecord> recordOption = hoodieRecord.toIndexedRecord(schema,
this.config.getProps())
- .map(HoodieRecord::getData);
- super.writeInsertRecord(hoodieRecord);
+ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Schema
schema) throws IOException {
+ HoodieRecord<T> savedRecord = hoodieRecord.newInstance();
Review Comment:
Why do we need to do `newInstance`?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -118,76 +120,80 @@ public HoodieRecordType getRecordType() {
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
- return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
getStructType(), consistentLogicalTimestampEnabled);
+ public ComparableList getComparableColumnValues(Schema recordSchema,
String[] columns, boolean consistentLogicalTimestampEnabled) {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
structType, consistentLogicalTimestampEnabled);
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema)
throws IOException {
- StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
- StructType writerStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data,
getStructType(), (InternalRow) other.getData(), otherStructType,
writerStructType);
- return new HoodieSparkRecord(getKey(), mergeRow, writerStructType,
getOperation());
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws
IOException {
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
+ return new HoodieSparkRecord(getKey(), mergeRow, targetStructType,
getOperation());
}
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
Review Comment:
I'm not sure i fully understand the purpose of this method. Can you please
elaborate?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java:
##########
@@ -89,11 +92,7 @@ public void write(HoodieRecord oldRecord) {
throw new HoodieUpsertException("Insert/Update not in sorted order");
}
try {
- if (useWriterSchemaForCompaction) {
- writeRecord(hoodieRecord, Option.of(hoodieRecord),
tableSchemaWithMetaFields, config.getProps());
- } else {
- writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchema,
config.getProps());
- }
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), newSchema,
config.getProps());
Review Comment:
Note to self: we need to revisit this API to avoid passing both incoming and
combined records (we should just pass in combined along w/ a record-key)
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java:
##########
@@ -41,5 +41,13 @@ HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
/**
* This method used to extract HoodieKey through keyGenerator. This method
used in ClusteringExecutionStrategy.
*/
- HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props,
Option<BaseKeyGenerator> keyGen);
+ HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
Properties props, Option<BaseKeyGenerator> keyGen);
+
+ /**
+ * This method used to overwrite record key field.
+ */
+ HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String
keyName,
Review Comment:
A few notes:
- `keyFieldName`
- We should not allow external `value` to be passed instead always
overriding it (w/ null)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +75,31 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord)
oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec ->
((HoodieAvroIndexedRecord) rec).getData()));
+ Option<IndexedRecord> combineRecord;
+ if (combineRecordOp.isPresent()) {
Review Comment:
@xushiyan this is what i was referring to
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +75,31 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
Review Comment:
Why do we need `newInstance`?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java:
##########
@@ -170,8 +185,8 @@ public Option<Map<String, String>> getMetadata() {
}
@Override
- public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema,
Properties props) throws IOException {
- Option<IndexedRecord> avroData = getData().getInsertValue(schema, props);
+ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema,
Properties props) throws IOException {
Review Comment:
- We should just return an `HoodieAvroIndexedRecord` here
- Make `HoodieAvroIndexedRecord` hold `Option<IndexedRecord>` (this will
make it compatible w/ HoodieAvroRecord)
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -43,25 +43,9 @@ object HoodieInternalRowUtils {
ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType),
UnsafeProjection]] {
Review Comment:
Why using ThreadLocal for this Map, while ConcurrentHashMap for others?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -118,76 +120,80 @@ public HoodieRecordType getRecordType() {
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
- return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
getStructType(), consistentLogicalTimestampEnabled);
+ public ComparableList getComparableColumnValues(Schema recordSchema,
String[] columns, boolean consistentLogicalTimestampEnabled) {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
structType, consistentLogicalTimestampEnabled);
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema)
throws IOException {
- StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
- StructType writerStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data,
getStructType(), (InternalRow) other.getData(), otherStructType,
writerStructType);
- return new HoodieSparkRecord(getKey(), mergeRow, writerStructType,
getOperation());
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws
IOException {
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
+ return new HoodieSparkRecord(getKey(), mergeRow, targetStructType,
getOperation());
}
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
UTF8String[] metaFields = extractMetaField(targetStructType);
if (metaFields.length == 0) {
throw new UnsupportedOperationException();
}
- InternalRow resultRow;
- if (extractMetaField(getStructType()).length == 0) {
- resultRow = new HoodieInternalRow(metaFields, data, false);
- } else {
- resultRow = new HoodieInternalRow(metaFields, data, true);
- }
-
+ boolean containMetaFields = extractMetaField(structType).length != 0;
Review Comment:
`extractMetaField` is a very expensive operation -- we should use a cheaper
check of just doing a lookup in a schema for a single meta-field
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -335,21 +334,24 @@ protected boolean writeRecord(HoodieRecord<T>
hoodieRecord, Option<HoodieRecord>
* Go through an old record. Here if we detect a newer version shows up, we
write the new one to the file.
*/
public void write(HoodieRecord<T> oldRecord) {
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields :
tableSchema;
+ Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields
: tableSchema;
+ Schema newSchema = useWriterSchemaForCompaction ?
tableSchemaWithMetaFields : tableSchema;
boolean copyOldRecord = true;
- String key = oldRecord.getRecordKey(keyGeneratorOpt);
+ String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
TypedProperties props = config.getPayloadConfig().getProps();
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie
record will be deflated after
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
try {
- Option<HoodieRecord> combinedRecord = recordMerger.merge(oldRecord,
hoodieRecord, schema, props);
+ Pair<Option<HoodieRecord>, Schema> mergeResult =
recordMerger.merge(oldRecord, oldSchema, hoodieRecord, newSchema, props);
+ Schema combineRecordSchema = mergeResult.getRight();
+ Option<HoodieRecord> combinedRecord = mergeResult.getLeft();
- if (combinedRecord.isPresent() &&
combinedRecord.get().shouldIgnore(schema, props)) {
+ if (combinedRecord.isPresent() &&
combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not
update the new record.
copyOldRecord = true;
- } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedRecord))
{
+ } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedRecord,
combineRecordSchema)) {
Review Comment:
nit: `hoodieRecord` > `newRecord`
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -288,13 +285,14 @@ public void checkState() {
/**
* Get column in record to support RDDCustomColumnsSortPartitioner
+ * @return
*/
- public abstract Object getRecordColumnValues(Schema recordSchema, String[]
columns, boolean consistentLogicalTimestampEnabled);
+ public abstract ComparableList getComparableColumnValues(Schema
recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled);
Review Comment:
Seems like my previous comment got lost somehow:
We should not do coercion to Comparable, here instead we should provide
generic enough API to return column values `Object[] getColumnValues`.
Whether to coerce it to Comparable should be decided by whoever is using
this API
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -219,15 +203,10 @@ object HoodieInternalRowUtils {
}
def getCachedUnsafeConvert(structType: StructType): UnsafeProjection = {
Review Comment:
Let's de-duplicate and keep just `getCachedUnsafeProjection`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +75,31 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord)
oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec ->
((HoodieAvroIndexedRecord) rec).getData()));
+ Option<IndexedRecord> combineRecord;
+ if (combineRecordOp.isPresent()) {
+ combineRecord =
savedCombineRecordOp.get().toIndexedRecord(writerSchema,
config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData);
+ } else {
+ combineRecord = Option.empty();
+ }
+ cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord)
savedOldRecord).getData(), isDelete ? Option.empty() : combineRecord);
Review Comment:
Why not doing `toIndexedRecord` (and then cast) instead?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -91,21 +60,15 @@ private static Option<String>
getNullableValAsString(StructType structType, Inte
* @param structType {@link StructType} instance.
* @return Column value if a single column, or concatenated String values by
comma.
*/
- public static Object getRecordColumnValues(InternalRow row,
+ public static ComparableList getRecordColumnValues(InternalRow row,
Review Comment:
Same comment as above -- we should not be coercing this to Comparable and
keep it generic
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -91,21 +60,15 @@ private static Option<String>
getNullableValAsString(StructType structType, Inte
* @param structType {@link StructType} instance.
* @return Column value if a single column, or concatenated String values by
comma.
*/
- public static Object getRecordColumnValues(InternalRow row,
+ public static ComparableList getRecordColumnValues(InternalRow row,
String[] columns,
StructType structType, boolean consistentLogicalTimestampEnabled) {
- if (columns.length == 1) {
- NestedFieldPath posList =
HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
- return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
- } else {
- // TODO this is inefficient, instead we can simply return array of
Comparable
- StringBuilder sb = new StringBuilder();
- for (String col : columns) {
- // TODO support consistentLogicalTimestampEnabled
- NestedFieldPath posList =
HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
- return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
- }
- return sb.toString();
+ List<Comparable> list = new LinkedList<>();
Review Comment:
Practically there's no good reason to use `LinkedList` nowadays, since they
are always performing worse than `ArrayList`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -267,9 +268,8 @@ protected void init(String fileId,
Iterator<HoodieRecord<T>> newRecordsItr) {
+ ((ExternalSpillableMap)
keyToNewRecords).getSizeOfFileOnDiskInBytes());
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp) throws
IOException {
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema) throws IOException {
Review Comment:
Disregard. This is how it's right now, left note to self to revisit this.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -36,17 +36,28 @@ public class HoodieAvroRecordMerger implements
HoodieRecordMerger {
@Override
public String getMergingStrategy() {
- return StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+ return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
}
@Override
- public Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer,
Schema schema, Properties props) throws IOException {
+ public Pair<Option<HoodieRecord>, Schema> merge(HoodieRecord older, Schema
oldSchema,
+ HoodieRecord newer, Schema newSchema, Properties props) throws
IOException {
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.AVRO);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.AVRO);
if (older instanceof HoodieAvroRecord && newer instanceof
HoodieAvroRecord) {
- return Option.of(preCombine(older, newer));
+ HoodieRecord res = preCombine(older, newer);
Review Comment:
@wzx140 i understand what you're doing here, but we shouldn't be guessing
the use-case (either de-duping w/in a batch or merging w/ already persisted
record) based on the type of the record we see.
- It's extremely brittle
- It's very unintuitive
Instead we can pass special config (for ex, only when we do de-duping) and
then control whether we do pre-combine or full-blown merge based on that
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +75,31 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord)
oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec ->
((HoodieAvroIndexedRecord) rec).getData()));
+ Option<IndexedRecord> combineRecord;
+ if (combineRecordOp.isPresent()) {
+ combineRecord =
savedCombineRecordOp.get().toIndexedRecord(writerSchema,
config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData);
+ } else {
+ combineRecord = Option.empty();
+ }
+ cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord)
savedOldRecord).getData(), isDelete ? Option.empty() : combineRecord);
+ hoodieRecord.deflate();
}
return result;
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws
IOException {
- // Get the data before deflated
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields :
tableSchema;
- Option<IndexedRecord> recordOption = hoodieRecord.toIndexedRecord(schema,
this.config.getProps())
- .map(HoodieRecord::getData);
- super.writeInsertRecord(hoodieRecord);
+ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Schema
schema) throws IOException {
+ HoodieRecord<T> savedRecord = hoodieRecord.newInstance();
Review Comment:
Is this because we deflate the record after insertion?
If that's the case then we instead should do the copying inside the
`CDCLogger` itself
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java:
##########
@@ -93,8 +93,8 @@ public HoodieConcatHandle(HoodieWriteConfig config, String
instantTime, HoodieTa
*/
@Override
public void write(HoodieRecord oldRecord) {
- String key = oldRecord.getRecordKey(keyGeneratorOpt);
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields :
tableSchema;
Review Comment:
@wzx140 would need to clarify this one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]