alexeykudinkin commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r992844317
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -233,51 +241,61 @@ public Option<Map<String, String>> getMetadata() {
}
@Override
- public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema,
Properties prop) throws IOException {
+ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema,
Properties prop) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public Comparable<?> getOrderingValue(Properties props) {
+ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
String orderingField = ConfigUtils.getOrderingField(props);
- if (!HoodieCatalystExpressionUtils$.MODULE$.existField(getStructType(),
orderingField)) {
+ if (!HoodieCatalystExpressionUtils$.MODULE$.existField(structType,
orderingField)) {
return 0;
} else {
- NestedFieldPath nestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(getStructType(),
- orderingField);
- Comparable<?> value = (Comparable<?>)
HoodieUnsafeRowUtils.getNestedInternalRowValue(
- data, nestedFieldPath);
+ NestedFieldPath nestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
+ Comparable<?> value = (Comparable<?>)
HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
return value;
}
}
- public StructType getStructType() {
- if (schemaFingerPrint.isPresent()) {
- return
HoodieInternalRowUtils.getCachedSchemaFromFingerPrint(schemaFingerPrint.get());
- } else {
- return structType;
- }
+ private UTF8String[] extractMetaField(StructType structType) {
+ return HOODIE_META_COLUMNS_WITH_OPERATION.stream()
+ .filter(f ->
HoodieCatalystExpressionUtils$.MODULE$.existField(structType, f))
+ .map(field ->
data.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(field)))
+ .toArray(UTF8String[]::new);
}
- private void initSchema(StructType structType) {
- if (HoodieInternalRowUtils.containsCompressedSchema(structType)) {
- HoodieInternalRowUtils.addCompressedSchema(structType);
- this.schemaFingerPrint =
Option.of(HoodieInternalRowUtils.getCachedFingerPrintFromSchema(structType));
- } else {
- this.structType = structType;
- }
+ private boolean hasMetaField(StructType structType) {
Review Comment:
nit: Can make it static
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +80,33 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
Review Comment:
nit: `combinedRecordOpt`
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java:
##########
@@ -170,8 +185,8 @@ public Option<Map<String, String>> getMetadata() {
}
@Override
- public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema,
Properties props) throws IOException {
- Option<IndexedRecord> avroData = getData().getInsertValue(schema, props);
+ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema,
Properties props) throws IOException {
Review Comment:
Sorry, my 2d statement was confusing, let me elaborate on my comment:
- This API converts from one `HoodieRecord` to another `HoodieRecord`
- There's no reason why we'd be returning an Option (from external
perspective, it looks really strange that we actually return an Option, though
we simply do a lateral conversion)
- Ideally we should be returning an `Option` when this object is
dereferenced (`getData` is called), but this would affect the API
Let's keep it as it is for now, we can change it later when we'll be getting
rid of `HoodieAvroRecord`
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java:
##########
@@ -59,20 +66,34 @@ public
FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, String
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(newRecord, oldRecord,
combineRecordOp, writerSchema);
if (result) {
- boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(),
isDelete ? Option.empty() : combineRecordOp.map(rec ->
((HoodieAvroIndexedRecord) rec).getData()));
+ boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+ savedCombineRecordOp.ifPresent(record -> {
+ try {
+ Option<IndexedRecord> avroRecord = savedCombineRecordOp.get()
+ .toIndexedRecord(writerSchema,
config.getPayloadConfig().getProps())
+ .map(HoodieAvroIndexedRecord::getData);
+ cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(),
isDelete ? Option.empty() : avroRecord);
+ } catch (IOException e) {
+ LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
+ }
+ });
}
return result;
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws
IOException {
- super.writeInsertRecord(hoodieRecord);
- if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
- cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord)
hoodieRecord.getData()));
+ protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema)
throws IOException {
+ HoodieRecord<T> savedRecord = newRecord.newInstance();
+ // hoodieRecord deflated after writeInsertRecord
Review Comment:
nit: Please update the comment, move it on the line above
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -118,76 +120,80 @@ public HoodieRecordType getRecordType() {
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
- return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
getStructType(), consistentLogicalTimestampEnabled);
+ public ComparableList getComparableColumnValues(Schema recordSchema,
String[] columns, boolean consistentLogicalTimestampEnabled) {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
structType, consistentLogicalTimestampEnabled);
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema)
throws IOException {
- StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
- StructType writerStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data,
getStructType(), (InternalRow) other.getData(), otherStructType,
writerStructType);
- return new HoodieSparkRecord(getKey(), mergeRow, writerStructType,
getOperation());
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws
IOException {
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
+ return new HoodieSparkRecord(getKey(), mergeRow, targetStructType,
getOperation());
}
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
UTF8String[] metaFields = extractMetaField(targetStructType);
Review Comment:
Right, but you're extracting this meta-fields from the current record, and
so if it doesn't currently have it there's nothing to extract.
My point is we can't apply new schema to the record, until we rewrite the
record into new schema -- we should use `recordSchema` to check whether it has
the meta-fields
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java:
##########
@@ -69,7 +70,7 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieRecord<T> reducedRecord;
try {
// Precombine do not need schema and do not return null
- reducedRecord = recordMerger.merge(rec1, rec2, schema.get(),
props).get();
+ reducedRecord = recordMerger.merge(rec1, schema.get(), rec2,
schema.get(), ConfigUtils.setDeDuping(props)).get().getLeft();
Review Comment:
We can create these props just once, right? Please address this for all of
it usages
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -86,13 +87,20 @@ public ClosableIterator<InternalRow>
getInternalRowIterator(Schema readerSchema,
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), (Boolean)
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()));
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), (Boolean)
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()));
InputFile inputFile = HadoopInputFile.fromPath(path, conf);
- ParquetReader reader = new ParquetReader.Builder<InternalRow>(inputFile) {
+ ParquetReader<InternalRow> reader = new
ParquetReader.Builder<InternalRow>(inputFile) {
@Override
protected ReadSupport getReadSupport() {
return new ParquetReadSupport();
}
}.withConf(conf).build();
- ParquetReaderIterator<InternalRow> parquetReaderIterator = new
ParquetReaderIterator<>(reader, InternalRow::copy);
+ ParquetReaderIterator<InternalRow> parquetReaderIterator = new
ParquetReaderIterator<>(reader,
Review Comment:
@wzx140 let me elaborate on the context of my question:
I'm aware that Spark is using mutable buffers for reading the data, but we
shouldn't need to copy it right away, we should only *have to* copy where
**absolutely necessary**. Meaning that by default, if we'd just be iterating
over the records from one file and writing it into another one we wouldn't need
to make any copies, right? We can easily reuse the same buffer, and while we
continue to read and write 1 record at a time (while iterating over provided
`Iterator`).
```
val iter = parquet.read(file)
while (iter.hasNext) {
writeToFile(iter.next)
}
```
We only really need to make a copy whenever we retain a reference to the
`Row`, for ex, if we do `reduceByKey` (as below, or if we'd hold it in a Map,
etc)
```
val iter = parquet.read(file)
iter.reduceByKey((r1, r2) => {
// NOTE: In this case we're holding refs to 2 rows simultaneously, hence
we'd need to make a copy before that
// since otherwise both r1 and r2 will be pointing to the same buffer
})
```
So, we basically need:
- Avoid blanket copying here
- Find the place where we hold the refs to the row (as described above,
this is the reason the test is failing) and allow copying in the least possible
scope
Does it make sense?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java:
##########
@@ -52,4 +57,16 @@ public static String getPayloadClass(Properties properties) {
}
return payloadClass;
}
+
+ public static List<String> getMergerImpls(Map<String, String> optParams) {
+ return Arrays.stream(
+ optParams.getOrDefault("hoodie.datasource.write.merger.impls",
+ HoodieAvroRecordMerger.class.getName()).split(","))
+ .map(String::trim).distinct().collect(Collectors.toList());
+ }
+
+ public static Properties setDeDuping(Properties props) {
Review Comment:
Let's move this util to `HoodieAvroRecordMerger` (since it's only useful
with it)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -118,76 +120,80 @@ public HoodieRecordType getRecordType() {
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
- return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
getStructType(), consistentLogicalTimestampEnabled);
+ public ComparableList getComparableColumnValues(Schema recordSchema,
String[] columns, boolean consistentLogicalTimestampEnabled) {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
structType, consistentLogicalTimestampEnabled);
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema)
throws IOException {
- StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
- StructType writerStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data,
getStructType(), (InternalRow) other.getData(), otherStructType,
writerStructType);
- return new HoodieSparkRecord(getKey(), mergeRow, writerStructType,
getOperation());
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws
IOException {
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
+ return new HoodieSparkRecord(getKey(), mergeRow, targetStructType,
getOperation());
}
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
Review Comment:
Please check my comment below. Let's consolidate discussion in there
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -43,25 +43,9 @@ object HoodieInternalRowUtils {
ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType),
UnsafeProjection]] {
Review Comment:
Totally forgot that it relies on a mutable buffer. Let's leave a comment
explaining that
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +75,31 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ Option<HoodieRecord> savedCombineRecordOp =
combineRecordOp.map(HoodieRecord::newInstance);
Review Comment:
I see what you're working around now.
Let's leave a TODO and create a ticket to follow-up
- Remove record deflation: instead of deflation, we should just make sure
we avoid keeping refs to the Record instant itself (so that it'd be collected)
- Remove these unnecessary `newInstance` invocations
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]