jonvex commented on code in PR #10957:
URL: https://github.com/apache/hudi/pull/10957#discussion_r1626164199
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -242,7 +250,44 @@ protected Pair<ClosableIterator<T>, Schema>
getRecordsIterator(HoodieDataBlock d
} else {
blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
}
- return Pair.of(blockRecordsIterator, dataBlock.getSchema());
+ Option<Pair<Function<T,T>, Schema>> schemaEvolutionTransformerOpt =
+ composeEvolvedSchemaTransformer(dataBlock);
+
+ // In case when schema has been evolved original persisted records will
have to be
+ // transformed to adhere to the new schema
+ Function<T,T> transformer =
+ schemaEvolutionTransformerOpt.map(Pair::getLeft)
+ .orElse(Function.identity());
+
+ Schema schema = schemaEvolutionTransformerOpt.map(Pair::getRight)
+ .orElseGet(dataBlock::getSchema);
+
+ return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator,
transformer), schema);
+ }
+
+ /**
+ * Get final Read Schema for support evolution.
+ * step1: find the fileSchema for current dataBlock.
+ * step2: determine whether fileSchema is compatible with the final read
internalSchema.
+ * step3: merge fileSchema and read internalSchema to produce final read
schema.
+ *
+ * @param dataBlock current processed block
+ * @return final read schema.
+ */
+ protected Option<Pair<Function<T,T>, Schema>>
composeEvolvedSchemaTransformer(
+ HoodieDataBlock dataBlock) {
+ if (internalSchema.isEmptySchema()) {
+ return Option.empty();
+ }
+
+ long currentInstantTime =
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
+ InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
+ hoodieTableMetaClient, false);
Review Comment:
Maybe. I set false because that is what was done here:
https://github.com/apache/hudi/blob/9246fd7128bec09f4e7cd951c82f18bfda2cb905/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java#L813
I used the log record reader as an example to implement schema evolution here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]