yihua commented on code in PR #13313:
URL: https://github.com/apache/hudi/pull/13313#discussion_r2101313666
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -762,21 +742,38 @@ private static HoodieData<HoodieRecord>
readRecordKeysFromFileSliceSnapshot(Hood
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::requestedTime);
+ if (!instantTime.isPresent()) {
+ return engineContext.emptyHoodieData();
+ }
engineContext.setJobStatus(activeModule, "Record Index: reading record
keys from " + partitionFileSlicePairs.size() + " file slices");
final int parallelism = Math.min(partitionFileSlicePairs.size(),
recordIndexMaxParallelism);
-
+ ReaderContextFactory<T> readerContextFactory =
engineContext.getReaderContextFactory(metaClient);
return engineContext.parallelize(partitionFileSlicePairs,
parallelism).flatMap(partitionAndFileSlice -> {
-
final String partition = partitionAndFileSlice.getKey();
final FileSlice fileSlice = partitionAndFileSlice.getValue();
final String fileId = fileSlice.getFileId();
- return new HoodieMergedReadHandle(dataWriteConfig, instantTime,
hoodieTable, Pair.of(partition, fileSlice.getFileId()),
- Option.of(fileSlice)).getMergedRecords().stream().map(record -> {
- HoodieRecord record1 = (HoodieRecord) record;
- return
HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(),
partition, fileId,
- record1.getCurrentLocation().getInstantTime(), 0);
- }).iterator();
+ HoodieReaderContext<T> readerContext = readerContextFactory.getContext();
+ Schema dataSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(dataWriteConfig.getWriteSchema()),
dataWriteConfig.allowOperationMetadataField());
+ Schema requestedSchema =
metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema() :
dataSchema;
Review Comment:
When `populateMetaFields=false`, should the record key fields be read
instead of the full `dataSchema` to trim down the data to scan?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -307,7 +309,12 @@ public Option<Map<String, String>> getMetadata() {
@Override
public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema,
Properties prop) {
- throw new UnsupportedOperationException();
+ if (data == null) {
+ return Option.empty();
+ }
+ StructType structType = schema == null ?
AvroConversionUtils.convertAvroSchemaToStructType(recordSchema) : schema;
+ GenericRecord convertedRecord =
AvroConversionUtils.createInternalRowToAvroConverter(structType, recordSchema,
false).apply(data);
+ return Option.of(new HoodieAvroIndexedRecord(key, convertedRecord));
Review Comment:
Where is this method invoked now that would throw
`UnsupportedOperationException` without these changes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]