alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089880847


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -136,24 +136,22 @@ protected void doWrite(HoodieRecord record, Schema 
schema, TypedProperties props
         if (record.shouldIgnore(schema, config.getProps())) {
           return;
         }
-        // Convert GenericRecord to GenericRecord with hoodie commit metadata 
in schema
-        HoodieRecord rewriteRecord;
-        if (schemaOnReadEnabled) {
-          rewriteRecord = record.rewriteRecordWithNewSchema(schema, 
config.getProps(), writeSchemaWithMetaFields);
-        } else {
-          rewriteRecord = record.rewriteRecord(schema, config.getProps(), 
writeSchemaWithMetaFields);
-        }
+
         MetadataValues metadataValues = new 
MetadataValues().setFileName(path.getName());
-        rewriteRecord = 
rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, 
config.getProps(), metadataValues);
+        HoodieRecord populatedRecord =

Review Comment:
   Change similar to 
https://github.com/apache/hudi/pull/7769#discussion_r1089880703



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -374,20 +374,16 @@ public void write(HoodieRecord<T> oldRecord) {
   }
 
   protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema 
schema, Properties prop, boolean shouldPreserveRecordMetadata) throws 
IOException {
-    HoodieRecord rewriteRecord;
-    if (schemaOnReadEnabled) {
-      rewriteRecord = record.rewriteRecordWithNewSchema(schema, prop, 
writeSchemaWithMetaFields);
-    } else {
-      rewriteRecord = record.rewriteRecord(schema, prop, 
writeSchemaWithMetaFields);
-    }
     // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point 
to the
     //       file holding this record even in cases when overall metadata is 
preserved
     MetadataValues metadataValues = new 
MetadataValues().setFileName(newFilePath.getName());
-    rewriteRecord = 
rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, 
metadataValues);
+    HoodieRecord populatedRecord =

Review Comment:
   Change similar to 
https://github.com/apache/hudi/pull/7769#discussion_r1089880703



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -130,29 +128,27 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
             (left, right) ->
                 left.joinWith(right, 
mergeHandle.getWriterSchemaWithMetaFields()));
         recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
-      } else if (schemaEvolutionTransformerOpt.isPresent()) {
-        recordIterator = new MappingIterator<>(baseFileRecordIterator,
-            
schemaEvolutionTransformerOpt.get().getLeft().apply(isPureProjection ? 
writerSchema : readerSchema));
-        recordSchema = schemaEvolutionTransformerOpt.get().getRight();
       } else {
         recordIterator = baseFileRecordIterator;
         recordSchema = isPureProjection ? writerSchema : readerSchema;
       }
 
+      boolean isBufferingRecords = 
ExecutorFactory.isBufferingRecords(writeConfig);
+
       wrapper = ExecutorFactory.create(writeConfig, recordIterator, new 
UpdateHandler(mergeHandle), record -> {
+        HoodieRecord newRecord;
+        if (schemaEvolutionTransformerOpt.isPresent()) {

Review Comment:
   Schema Evolution transformer now is applied inside the transformer as 
opposed to as an MappingIterator previously



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -416,7 +423,8 @@ private static HoodieRecord<InternalRow> 
convertToHoodieSparkRecord(StructType s
         getValue(structType, recordKeyPartitionPathFieldPair.getRight(), 
record.data).toString());
 
     HoodieOperation operation = withOperationField
-        ? HoodieOperation.fromName(getNullableValAsString(structType, 
record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+        ? 
HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))

Review Comment:
   Good point! Let me revisit



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -190,9 +194,10 @@ public HoodieRecord rewriteRecord(Schema recordSchema, 
Properties props, Schema
     StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = 
HoodieInternalRowUtils.getCachedSchema(targetSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = 
HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
-    UnsafeRow unsafeRow = 
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, 
targetStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, 
targetStructType, Collections.emptyMap());

Review Comment:
   Key point here is that we actually don't need actually `rewriteRecord` 
operation as such: historically it has been used to expand (Avro) schema of the 
record to accommodate for meta-fields, which is actually handled differently 
now in `HoodieSparkRecord`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -206,9 +211,10 @@ public HoodieRecord rewriteRecordWithNewSchema(Schema 
recordSchema, Properties p
     StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType newStructType = 
HoodieInternalRowUtils.getCachedSchema(newSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = 
HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, 
newStructType, renameCols);
-    UnsafeRow unsafeRow = 
HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, 
newStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, 
newStructType, Collections.emptyMap());

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to