yihua commented on code in PR #13171:
URL: https://github.com/apache/hudi/pull/13171#discussion_r2053082933
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -104,24 +151,22 @@ public Object getValue(IndexedRecord record, Schema
schema, String fieldName) {
@Override
public String getRecordKey(IndexedRecord record, Schema schema) {
- return getFieldValueFromIndexedRecord(record, schema, ROW_KEY).toString();
+ if (metaFieldsPopulated) {
+ return getFieldValueFromIndexedRecord(record, schema,
RECORD_KEY_METADATA_FIELD).toString();
+ }
+ return keyGenerator.getRecordKey((GenericRecord) record);
Review Comment:
nit: ideally this can be made general in `HoodieReaderContext`. OK to keep
it as is in this PR.
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -104,24 +151,22 @@ public Object getValue(IndexedRecord record, Schema
schema, String fieldName) {
@Override
public String getRecordKey(IndexedRecord record, Schema schema) {
- return getFieldValueFromIndexedRecord(record, schema, ROW_KEY).toString();
+ if (metaFieldsPopulated) {
+ return getFieldValueFromIndexedRecord(record, schema,
RECORD_KEY_METADATA_FIELD).toString();
+ }
+ return keyGenerator.getRecordKey((GenericRecord) record);
}
@Override
public HoodieRecord constructHoodieRecord(
Option<IndexedRecord> recordOpt,
- Map<String, Object> metadataMap
- ) {
- String appliedPayloadClass =
- payloadClass.isPresent()
- ? payloadClass.get()
- : DefaultHoodieRecordPayload.class.getName();
+ Map<String, Object> metadataMap) {
if (!recordOpt.isPresent()) {
return SpillableMapUtils.generateEmptyPayload(
(String) metadataMap.get(INTERNAL_META_RECORD_KEY),
(String) metadataMap.get(INTERNAL_META_PARTITION_PATH),
(Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
- appliedPayloadClass);
+ payloadClass);
Review Comment:
For the `seal(IndexedRecord record)` below, there is no need to instantiate
a new record builder and copy the values, assuming that the `IndexRecord`
returned by the reader iterator is not modified in place (in contrary, Spark
parquet iterator does modify the `InternalRow` returned so additional copying
is needed in case the record is not immediately consumed and used).
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -141,13 +186,13 @@ public ClosableIterator<IndexedRecord>
mergeBootstrapReaders(ClosableIterator<In
Schema
skeletonRequiredSchema,
ClosableIterator<IndexedRecord> dataFileIterator,
Schema
dataRequiredSchema) {
- return null;
+ return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema,
dataFileIterator, dataRequiredSchema);
}
@Override
public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to,
Map<String, String> renamedColumns) {
if (!renamedColumns.isEmpty()) {
- throw new UnsupportedOperationException("Schema evolution is not
supported for the test reader context");
+ throw new UnsupportedOperationException("Schema evolution is not
supported for the HoodieAvroReaderContext");
Review Comment:
```suggestion
throw new UnsupportedOperationException("Column renaming is not
supported for the HoodieAvroReaderContext");
```
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -141,13 +186,13 @@ public ClosableIterator<IndexedRecord>
mergeBootstrapReaders(ClosableIterator<In
Schema
skeletonRequiredSchema,
ClosableIterator<IndexedRecord> dataFileIterator,
Schema
dataRequiredSchema) {
- return null;
+ return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema,
dataFileIterator, dataRequiredSchema);
}
@Override
public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to,
Map<String, String> renamedColumns) {
if (!renamedColumns.isEmpty()) {
- throw new UnsupportedOperationException("Schema evolution is not
supported for the test reader context");
+ throw new UnsupportedOperationException("Schema evolution is not
supported for the HoodieAvroReaderContext");
}
Map<String, Integer> fromFields = IntStream.range(0,
from.getFields().size())
Review Comment:
If we still need `#projectRecord`, it would be good to cache the
transformation based on `<Schema from, Schema to, Map<String, String>
renamedColumns>` instead of computing the transformation upon each record
(`BaseSparkInternalRowReaderContext` does something similar).
Another optimization is to push down the projection to the reader itself so
the reader iterator directly returns the `IndexRecord` based on the `to` schema
if possible to avoid reinstantiating the record here. This may require more
investigation, and we can keep the functional correctness without worrying
about the performance in this PR for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]