alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089880847
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -136,24 +136,22 @@ protected void doWrite(HoodieRecord record, Schema
schema, TypedProperties props
if (record.shouldIgnore(schema, config.getProps())) {
return;
}
- // Convert GenericRecord to GenericRecord with hoodie commit metadata
in schema
- HoodieRecord rewriteRecord;
- if (schemaOnReadEnabled) {
- rewriteRecord = record.rewriteRecordWithNewSchema(schema,
config.getProps(), writeSchemaWithMetaFields);
- } else {
- rewriteRecord = record.rewriteRecord(schema, config.getProps(),
writeSchemaWithMetaFields);
- }
+
MetadataValues metadataValues = new
MetadataValues().setFileName(path.getName());
- rewriteRecord =
rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields,
config.getProps(), metadataValues);
+ HoodieRecord populatedRecord =
Review Comment:
Change similar to
https://github.com/apache/hudi/pull/7769#discussion_r1089880703
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -374,20 +374,16 @@ public void write(HoodieRecord<T> oldRecord) {
}
protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema
schema, Properties prop, boolean shouldPreserveRecordMetadata) throws
IOException {
- HoodieRecord rewriteRecord;
- if (schemaOnReadEnabled) {
- rewriteRecord = record.rewriteRecordWithNewSchema(schema, prop,
writeSchemaWithMetaFields);
- } else {
- rewriteRecord = record.rewriteRecord(schema, prop,
writeSchemaWithMetaFields);
- }
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point
to the
// file holding this record even in cases when overall metadata is
preserved
MetadataValues metadataValues = new
MetadataValues().setFileName(newFilePath.getName());
- rewriteRecord =
rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop,
metadataValues);
+ HoodieRecord populatedRecord =
Review Comment:
Change similar to
https://github.com/apache/hudi/pull/7769#discussion_r1089880703
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -130,29 +128,27 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
(left, right) ->
left.joinWith(right,
mergeHandle.getWriterSchemaWithMetaFields()));
recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
- } else if (schemaEvolutionTransformerOpt.isPresent()) {
- recordIterator = new MappingIterator<>(baseFileRecordIterator,
-
schemaEvolutionTransformerOpt.get().getLeft().apply(isPureProjection ?
writerSchema : readerSchema));
- recordSchema = schemaEvolutionTransformerOpt.get().getRight();
} else {
recordIterator = baseFileRecordIterator;
recordSchema = isPureProjection ? writerSchema : readerSchema;
}
+ boolean isBufferingRecords =
ExecutorFactory.isBufferingRecords(writeConfig);
+
wrapper = ExecutorFactory.create(writeConfig, recordIterator, new
UpdateHandler(mergeHandle), record -> {
+ HoodieRecord newRecord;
+ if (schemaEvolutionTransformerOpt.isPresent()) {
Review Comment:
Schema Evolution transformer now is applied inside the transformer as
opposed to as an MappingIterator previously
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -416,7 +423,8 @@ private static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType s
getValue(structType, recordKeyPartitionPathFieldPair.getRight(),
record.data).toString());
HoodieOperation operation = withOperationField
- ? HoodieOperation.fromName(getNullableValAsString(structType,
record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+ ?
HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))
Review Comment:
Good point! Let me revisit
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -190,9 +194,10 @@ public HoodieRecord rewriteRecord(Schema recordSchema,
Properties props, Schema
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
- InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
- UnsafeRow unsafeRow =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType,
targetStructType).apply(rewriteRecord);
+ Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+ HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType,
targetStructType, Collections.emptyMap());
Review Comment:
Key point here is that we actually don't need actually `rewriteRecord`
operation as such: historically it has been used to expand (Avro) schema of the
record to accommodate for meta-fields, which is actually handled differently
now in `HoodieSparkRecord`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -206,9 +211,10 @@ public HoodieRecord rewriteRecordWithNewSchema(Schema
recordSchema, Properties p
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType newStructType =
HoodieInternalRowUtils.getCachedSchema(newSchema);
- // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
- InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType,
newStructType, renameCols);
- UnsafeRow unsafeRow =
HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType,
newStructType).apply(rewriteRecord);
+ Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+ HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType,
newStructType, Collections.emptyMap());
Review Comment:
Good catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]