danny0405 commented on code in PR #13572:
URL: https://github.com/apache/hudi/pull/13572#discussion_r2229852460
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -239,10 +250,53 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param storage {@link HoodieStorage} for reading records.
* @return {@link ClosableIterator<T>} that can return all records through
iteration.
*/
- public ClosableIterator<T> getFileRecordIterator(
- StoragePathInfo storagePathInfo, long start, long length, Schema
dataSchema, Schema requiredSchema,
- HoodieStorage storage) throws IOException {
- return getFileRecordIterator(storagePathInfo.getPath(), start, length,
dataSchema, requiredSchema, storage);
+ public final ClosableIterator<T> getFileRecordIterator(
+ StoragePathInfo storagePathInfo, long start, long length, Option<Schema>
dataSchema, Schema requiredSchema, HoodieStorage storage) throws IOException {
+ return getFileRecordIterator(Either.right(storagePathInfo), start, length,
dataSchema, requiredSchema, storage);
+ }
+
+ protected abstract ClosableIterator<T> doGetFileRecordIterator(StoragePath
filePath, long start, long length, Schema dataSchema, Schema requiredSchema,
HoodieStorage storage) throws IOException;
+
+ protected ClosableIterator<T> doGetFileRecordIterator(StoragePathInfo
storagePathInfo, long start, long length, Schema dataSchema, Schema
requiredSchema, HoodieStorage storage) throws IOException {
+ return doGetFileRecordIterator(storagePathInfo.getPath(), start, length,
dataSchema, requiredSchema, storage);
+ }
+
+ /**
+ * Handle schema evolution
+ */
+ private ClosableIterator<T> getFileRecordIterator(Either<StoragePath,
StoragePathInfo> filePathEither, long start, long length,
+ Option<Schema> dataSchema,
Schema requiredSchema, HoodieStorage storage) throws IOException {
+ if (isMetadataTable() ||
getSchemaHandler().getInternalSchemaOpt().isPresent()) {
+ return getFileRecordIteratorInternal(filePathEither, start, length,
dataSchema.get(), requiredSchema, storage);
+ }
+ Schema actualDataSchema = getActualDataSchema(dataSchema, filePathEither,
storage);
+ Schema actualRequriredSchema =
AvroSchemaUtils.pruneDataSchemaResolveNullable(actualDataSchema,
requiredSchema, getSchemaHandler().getPruneExcludeFields());
+ if (AvroSchemaUtils.areSchemasPrettyMuchEqual(actualRequriredSchema,
requiredSchema)) {
+ return getFileRecordIteratorInternal(filePathEither, start, length,
actualDataSchema, requiredSchema, storage);
+ }
+ UnaryOperator<T> projection = projectRecord(actualRequriredSchema,
requiredSchema);
+ return new
CloseableMappingIterator<>(getFileRecordIteratorInternal(filePathEither, start,
length, actualDataSchema, actualRequriredSchema, storage), projection);
+ }
+
+ private Schema getActualDataSchema(Option<Schema> dataSchema,
Either<StoragePath, StoragePathInfo> filePathEither, HoodieStorage storage)
throws IOException {
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]