hudi-agent commented on code in PR #19050:
URL: https://github.com/apache/hudi/pull/19050#discussion_r3457392117
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java:
##########
@@ -103,28 +102,18 @@ public ClosableIterator<RowData> getFileRecordIterator(
// disable schema evolution in fileReader if it's log file, since schema
evolution for log file is handled in `FileGroupRecordBuffer`
InternalSchemaManager schemaManager = isLogFile ?
InternalSchemaManager.DISABLED : internalSchemaManager.get();
- if
(filePath.getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
- if (schemaManager != InternalSchemaManager.DISABLED
- &&
!schemaManager.getMergeSchema(filePath.getName()).isEmptySchema()) {
- throw new HoodieValidationException("Flink Lance base-file support
does not support schema evolution.");
- }
- HoodieRowDataLanceReader rowDataLanceReader =
- (HoodieRowDataLanceReader) HoodieIOFactory.getIOFactory(storage)
- .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
- .getFileReader(tableConfig, filePath, HoodieFileFormat.LANCE,
Option.empty());
- try {
- return
rowDataLanceReader.getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(),
requiredSchema);
- } catch (RuntimeException e) {
- rowDataLanceReader.close();
- throw new HoodieException("Failed to get iterator from lance reader",
e);
- }
+ // Log files only reach this method for parquet data blocks; base files
are resolved by their extension.
+ // Format-specific handling lives in the readers themselves, so this
method stays format-agnostic.
+ HoodieFileFormat format = isLogFile ? HoodieFileFormat.PARQUET :
HoodieFileFormat.fromFileExtension(filePath.getFileExtension());
+ HoodieRowDataFileReader reader = (HoodieRowDataFileReader)
HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
+ .getFileReader(tableConfig, filePath, format, Option.empty());
+ try {
+ return reader.getRowDataIterator(dataSchema, requiredSchema,
schemaManager, getSafePredicates(requiredSchema));
Review Comment:
🤖 The unified entry point now declares `throws IOException`, and the parquet
path (`getRowDataIterator` -> `RecordIterators.getParquetRecordIterator`) can
genuinely throw `IOException` while opening the file. Since this catch only
handles `RuntimeException`, an `IOException` would skip `reader.close()` and
the wrapped error message. Was it intentional to leave the checked-exception
path out of the close-on-failure handling — would catching `Exception` (or also
`IOException`) better match the PR's stated goal here?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReader.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.source.ExpressionPredicates;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link HoodieFileReader} that reads {@link RowData}s and exposes a
uniform, format-agnostic
+ * entry point for getting a record iterator.
+ */
+public interface HoodieRowDataFileReader extends HoodieFileReader<RowData> {
+
+ /**
+ * Returns an iterator over {@link RowData}s for the given schemas. Each
format only uses the
+ * arguments relevant to it: e.g. the Lance reader ignores {@code
dataSchema} and {@code predicates}
+ * and instead validates that no schema evolution is requested.
+ *
+ * @param dataSchema schema of the records stored in the file.
+ * @param requiredSchema schema containing the fields to project.
+ * @param internalSchemaManager schema evolution manager; {@link
InternalSchemaManager#DISABLED} to skip.
+ * @param predicates filters to push down to the reader.
+ */
+ ClosableIterator<RowData> getRowDataIterator(
+ HoodieSchema dataSchema,
+ HoodieSchema requiredSchema,
+ InternalSchemaManager internalSchemaManager,
Review Comment:
🤖 nit: `internalSchemaManager` just repeats the type name — `schemaManager`
would be a bit more concise and matches the local variable name used at the
call site in `FlinkRowDataReaderContext`.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReader.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.source.ExpressionPredicates;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link HoodieFileReader} that reads {@link RowData}s and exposes a
uniform, format-agnostic
+ * entry point for getting a record iterator.
+ */
+public interface HoodieRowDataFileReader extends HoodieFileReader<RowData> {
+
+ /**
+ * Returns an iterator over {@link RowData}s for the given schemas. Each
format only uses the
+ * arguments relevant to it: e.g. the Lance reader ignores {@code
dataSchema} and {@code predicates}
Review Comment:
🤖 nit: the interface Javadoc names a specific implementor (Lance) inside the
general contract — could you drop the `e.g.` clause and just say "each format
uses only the arguments relevant to it"? Implementation-specific behaviour is
better documented in each implementor's own override Javadoc.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]