hudi-agent commented on code in PR #19050:
URL: https://github.com/apache/hudi/pull/19050#discussion_r3459413805
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java:
##########
@@ -103,28 +102,18 @@ public ClosableIterator<RowData> getFileRecordIterator(
// disable schema evolution in fileReader if it's log file, since schema
evolution for log file is handled in `FileGroupRecordBuffer`
InternalSchemaManager schemaManager = isLogFile ?
InternalSchemaManager.DISABLED : internalSchemaManager.get();
- if
(filePath.getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
- if (schemaManager != InternalSchemaManager.DISABLED
- &&
!schemaManager.getMergeSchema(filePath.getName()).isEmptySchema()) {
- throw new HoodieValidationException("Flink Lance base-file support
does not support schema evolution.");
- }
- HoodieRowDataLanceReader rowDataLanceReader =
- (HoodieRowDataLanceReader) HoodieIOFactory.getIOFactory(storage)
- .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
- .getFileReader(tableConfig, filePath, HoodieFileFormat.LANCE,
Option.empty());
- try {
- return
rowDataLanceReader.getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(),
requiredSchema);
- } catch (RuntimeException e) {
- rowDataLanceReader.close();
- throw new HoodieException("Failed to get iterator from lance reader",
e);
- }
+ // Log files only reach this method for parquet data blocks; base files
are resolved by their extension.
+ // Format-specific handling lives in the readers themselves, so this
method stays format-agnostic.
+ HoodieFileFormat format = isLogFile ? HoodieFileFormat.PARQUET :
HoodieFileFormat.fromFileExtension(filePath.getFileExtension());
+ HoodieRowDataFileReader reader = (HoodieRowDataFileReader)
HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
+ .getFileReader(tableConfig, filePath, format, Option.empty());
+ try {
+ return reader.getRowDataIterator(dataSchema, requiredSchema,
schemaManager, getSafePredicates(requiredSchema));
Review Comment:
🤖 Since the dispatch now resolves the format by extension instead of always
defaulting non-Lance base files to PARQUET, a base file with an ORC/HFILE
extension would resolve to a format the Flink reader factory doesn't return a
HoodieRowDataFileReader for, so the cast would throw ClassCastException. In
practice Flink only writes parquet/lance base files so this isn't a regression
(the old code would also fail reading such a file), but is it worth guarding
the dispatch to just PARQUET/LANCE to fail with a clearer message?
<sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java:
##########
@@ -98,6 +100,19 @@ public ClosableIterator<String> getRecordKeyIterator()
throws IOException {
return new CloseableMappingIterator<>(rowDataItr, rowData ->
Objects.toString(rowData.getString(0)));
}
+ @Override
+ public ClosableIterator<RowData> getRowDataIterator(
+ HoodieSchema dataSchema,
Review Comment:
🤖 nit: could you use `List<Predicate>` here instead of
`List<ExpressionPredicates.Predicate>`? `Predicate` is already imported in this
file (line 36) and the existing `getRowDataIterator` below uses the short form
— the new import of `ExpressionPredicates` on line 35 would then also become
redundant and can be dropped.
<sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]