yihua commented on code in PR #13171:
URL: https://github.com/apache/hudi/pull/13171#discussion_r2053082933


##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -104,24 +151,22 @@ public Object getValue(IndexedRecord record, Schema 
schema, String fieldName) {
 
   @Override
   public String getRecordKey(IndexedRecord record, Schema schema) {
-    return getFieldValueFromIndexedRecord(record, schema, ROW_KEY).toString();
+    if (metaFieldsPopulated) {
+      return getFieldValueFromIndexedRecord(record, schema, 
RECORD_KEY_METADATA_FIELD).toString();
+    }
+    return keyGenerator.getRecordKey((GenericRecord) record);

Review Comment:
   nit: ideally this can be made general in `HoodieReaderContext`.  OK to keep 
it as is in this PR.



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -104,24 +151,22 @@ public Object getValue(IndexedRecord record, Schema 
schema, String fieldName) {
 
   @Override
   public String getRecordKey(IndexedRecord record, Schema schema) {
-    return getFieldValueFromIndexedRecord(record, schema, ROW_KEY).toString();
+    if (metaFieldsPopulated) {
+      return getFieldValueFromIndexedRecord(record, schema, 
RECORD_KEY_METADATA_FIELD).toString();
+    }
+    return keyGenerator.getRecordKey((GenericRecord) record);
   }
 
   @Override
   public HoodieRecord constructHoodieRecord(
       Option<IndexedRecord> recordOpt,
-      Map<String, Object> metadataMap
-  ) {
-    String appliedPayloadClass =
-        payloadClass.isPresent()
-            ? payloadClass.get()
-            : DefaultHoodieRecordPayload.class.getName();
+      Map<String, Object> metadataMap) {
     if (!recordOpt.isPresent()) {
       return SpillableMapUtils.generateEmptyPayload(
           (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
           (String) metadataMap.get(INTERNAL_META_PARTITION_PATH),
           (Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
-          appliedPayloadClass);
+          payloadClass);

Review Comment:
   For the `seal(IndexedRecord record)` below, there is no need to instantiate 
a new record builder and copy the values, assuming that the `IndexRecord` 
returned by the reader iterator is not modified in place (in contrary, Spark 
parquet iterator does modify the `InternalRow` returned so additional copying 
is needed in case the record is not immediately consumed and used).



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -141,13 +186,13 @@ public ClosableIterator<IndexedRecord> 
mergeBootstrapReaders(ClosableIterator<In
                                                                Schema 
skeletonRequiredSchema,
                                                                
ClosableIterator<IndexedRecord> dataFileIterator,
                                                                Schema 
dataRequiredSchema) {
-    return null;
+    return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema, 
dataFileIterator, dataRequiredSchema);
   }
 
   @Override
   public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to, 
Map<String, String> renamedColumns) {
     if (!renamedColumns.isEmpty()) {
-      throw new UnsupportedOperationException("Schema evolution is not 
supported for the test reader context");
+      throw new UnsupportedOperationException("Schema evolution is not 
supported for the HoodieAvroReaderContext");

Review Comment:
   ```suggestion
         throw new UnsupportedOperationException("Column renaming is not 
supported for the HoodieAvroReaderContext");
   ```



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -141,13 +186,13 @@ public ClosableIterator<IndexedRecord> 
mergeBootstrapReaders(ClosableIterator<In
                                                                Schema 
skeletonRequiredSchema,
                                                                
ClosableIterator<IndexedRecord> dataFileIterator,
                                                                Schema 
dataRequiredSchema) {
-    return null;
+    return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema, 
dataFileIterator, dataRequiredSchema);
   }
 
   @Override
   public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to, 
Map<String, String> renamedColumns) {
     if (!renamedColumns.isEmpty()) {
-      throw new UnsupportedOperationException("Schema evolution is not 
supported for the test reader context");
+      throw new UnsupportedOperationException("Schema evolution is not 
supported for the HoodieAvroReaderContext");
     }
     Map<String, Integer> fromFields = IntStream.range(0, 
from.getFields().size())

Review Comment:
   If we still need `#projectRecord`, it would be good to cache the 
transformation based on `<Schema from, Schema to, Map<String, String> 
renamedColumns>` instead of computing the transformation upon each record 
(`BaseSparkInternalRowReaderContext` does something similar).
   
   Another optimization is to push down the projection to the reader itself so 
the reader iterator directly returns the `IndexRecord` based on the `to` schema 
if possible to avoid reinstantiating the record here.  This may require more 
investigation, and we can keep the functional correctness without worrying 
about the performance in this PR for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to