alexeykudinkin commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r993974335
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -86,13 +87,20 @@ public ClosableIterator<InternalRow>
getInternalRowIterator(Schema readerSchema,
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), (Boolean)
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()));
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), (Boolean)
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()));
InputFile inputFile = HadoopInputFile.fromPath(path, conf);
- ParquetReader reader = new ParquetReader.Builder<InternalRow>(inputFile) {
+ ParquetReader<InternalRow> reader = new
ParquetReader.Builder<InternalRow>(inputFile) {
@Override
protected ReadSupport getReadSupport() {
return new ParquetReadSupport();
}
}.withConf(conf).build();
- ParquetReaderIterator<InternalRow> parquetReaderIterator = new
ParquetReaderIterator<>(reader, InternalRow::copy);
+ ParquetReaderIterator<InternalRow> parquetReaderIterator = new
ParquetReaderIterator<>(reader,
Review Comment:
@wzx140 that's exactly what i'm referring to -- let's move such copying to
the least possible scope to make sure we're only making copies where we
absolutely have (and nowhere else)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +80,35 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig
config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema
writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
combineRecordOp);
+ // TODO Remove these unnecessary newInstance invocations
Review Comment:
@wzx140 please create a ticket under RFC-46 epic, and link it here like
`TODO(HUDI-xxx)`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -118,76 +120,80 @@ public HoodieRecordType getRecordType() {
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
- return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
getStructType(), consistentLogicalTimestampEnabled);
+ public ComparableList getComparableColumnValues(Schema recordSchema,
String[] columns, boolean consistentLogicalTimestampEnabled) {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
structType, consistentLogicalTimestampEnabled);
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema)
throws IOException {
- StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
- StructType writerStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data,
getStructType(), (InternalRow) other.getData(), otherStructType,
writerStructType);
- return new HoodieSparkRecord(getKey(), mergeRow, writerStructType,
getOperation());
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws
IOException {
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
+ return new HoodieSparkRecord(getKey(), mergeRow, targetStructType,
getOperation());
}
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
UTF8String[] metaFields = extractMetaField(targetStructType);
Review Comment:
@wzx140 you extract meta-fields from the current records using
`targetStructType` which is a new schema.
Let's consider the case when current record doesn't have meta-fields (it
doesn't have it in the schema), this code would fail unpredictably in that case
since we're using field ordinals from `targetStructType` (targetStructType !=
structType)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -87,6 +88,11 @@ public I deduplicateRecords(
return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
}
- public abstract I deduplicateRecords(
- I records, HoodieIndex<?, ?> index, int parallelism, String schema,
Properties props, HoodieRecordMerger merge);
+ public I deduplicateRecords(
+ I records, HoodieIndex<?, ?> index, int parallelism, String schema,
Properties props, HoodieRecordMerger merger) {
+ return innerDeduplicateRecords(records, index, parallelism, schema,
HoodieAvroRecordMerger.withDeDuping(props), merger);
Review Comment:
It's better to actually do the other way around:
- Keep inherited method named `deduplicateRecords`
- Create new _private_ method `deduplicateRecordsInternal` modifying props
and invoke it in this class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]