This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b7da18e322d9 refactor(flink): Unify file reader creation in
FlinkRowDataReaderContext (#19050)
b7da18e322d9 is described below
commit b7da18e322d9de3810eb3c32222aa7bb6971efa3
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Jun 24 09:48:47 2026 +0800
refactor(flink): Unify file reader creation in FlinkRowDataReaderContext
(#19050)
---
.../table/format/FlinkRowDataReaderContext.java | 33 +++++---------
.../hudi/table/format/HoodieRowDataFileReader.java | 50 ++++++++++++++++++++++
.../table/format/HoodieRowDataLanceReader.java | 19 +++++++-
.../table/format/HoodieRowDataParquetReader.java | 17 +++++++-
4 files changed, 94 insertions(+), 25 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 617dda2079ba..5cc3f85c82e0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -52,7 +52,6 @@ import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
@@ -103,28 +102,18 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
// disable schema evolution in fileReader if it's log file, since schema
evolution for log file is handled in `FileGroupRecordBuffer`
InternalSchemaManager schemaManager = isLogFile ?
InternalSchemaManager.DISABLED : internalSchemaManager.get();
- if
(filePath.getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
- if (schemaManager != InternalSchemaManager.DISABLED
- &&
!schemaManager.getMergeSchema(filePath.getName()).isEmptySchema()) {
- throw new HoodieValidationException("Flink Lance base-file support
does not support schema evolution.");
- }
- HoodieRowDataLanceReader rowDataLanceReader =
- (HoodieRowDataLanceReader) HoodieIOFactory.getIOFactory(storage)
- .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
- .getFileReader(tableConfig, filePath, HoodieFileFormat.LANCE,
Option.empty());
- try {
- return
rowDataLanceReader.getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(),
requiredSchema);
- } catch (RuntimeException e) {
- rowDataLanceReader.close();
- throw new HoodieException("Failed to get iterator from lance reader",
e);
- }
+ // Log files only reach this method for parquet data blocks; base files
are resolved by their extension.
+ // Format-specific handling lives in the readers themselves, so this
method stays format-agnostic.
+ HoodieFileFormat format = isLogFile ? HoodieFileFormat.PARQUET :
HoodieFileFormat.fromFileExtension(filePath.getFileExtension());
+ HoodieRowDataFileReader reader = (HoodieRowDataFileReader)
HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
+ .getFileReader(tableConfig, filePath, format, Option.empty());
+ try {
+ return reader.getRowDataIterator(dataSchema, requiredSchema,
schemaManager, getSafePredicates(requiredSchema));
+ } catch (Throwable e) {
+ reader.close();
+ throw new HoodieException("Failed to get record iterator for: " +
filePath, e);
}
- DataType rowType =
RowDataQueryContexts.fromSchema(dataSchema).getRowType();
- HoodieRowDataParquetReader rowDataParquetReader =
- (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
- .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
- .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET,
Option.empty());
- return rowDataParquetReader.getRowDataIterator(schemaManager, rowType,
requiredSchema, getSafePredicates(requiredSchema));
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReader.java
new file mode 100644
index 000000000000..9339d3d041cc
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.source.ExpressionPredicates;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link HoodieFileReader} that reads {@link RowData}s and exposes a
uniform, format-agnostic
+ * entry point for getting a record iterator.
+ */
+public interface HoodieRowDataFileReader extends HoodieFileReader<RowData> {
+
+ /**
+ * Returns an iterator over {@link RowData}s for the given schemas.
+ *
+ * @param dataSchema schema of the records stored in the file.
+ * @param requiredSchema schema containing the fields to project.
+ * @param internalSchemaManager schema evolution manager; {@link
InternalSchemaManager#DISABLED} to skip.
+ * @param predicates filters to push down to the reader.
+ */
+ ClosableIterator<RowData> getRowDataIterator(
+ HoodieSchema dataSchema,
+ HoodieSchema requiredSchema,
+ InternalSchemaManager internalSchemaManager,
+ List<ExpressionPredicates.Predicate> predicates) throws IOException;
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
index eba397093ec5..5a4597d27710 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
@@ -32,9 +32,10 @@ import
org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.io.memory.HoodieArrowAllocator;
-import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.row.HoodieFlinkLanceArrowUtils;
+import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.RowDataQueryContexts;
@@ -65,7 +66,7 @@ import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MIN_RECO
/**
* Lance reader for Flink RowData base files.
*/
-public class HoodieRowDataLanceReader implements HoodieFileReader<RowData> {
+public class HoodieRowDataLanceReader implements HoodieRowDataFileReader {
private static final int DEFAULT_BATCH_SIZE = 512;
@@ -152,6 +153,20 @@ public class HoodieRowDataLanceReader implements
HoodieFileReader<RowData> {
return new CloseableMappingIterator<>(rowDataItr, rowData ->
rowData.getString(0).toString());
}
+ @Override
+ public ClosableIterator<RowData> getRowDataIterator(
+ HoodieSchema dataSchema,
+ HoodieSchema requiredSchema,
+ InternalSchemaManager internalSchemaManager,
+ List<ExpressionPredicates.Predicate> predicates) {
+ // Lance base-file reading does not support schema evolution.
+ if (internalSchemaManager != InternalSchemaManager.DISABLED
+ &&
!internalSchemaManager.getMergeSchema(path.getName()).isEmptySchema()) {
+ throw new HoodieValidationException("Flink Lance base-file support does
not support schema evolution.");
+ }
+ return
getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(),
requiredSchema);
+ }
+
public ClosableIterator<RowData> getRowDataIterator(DataType dataType,
HoodieSchema requestedSchema) {
RowType rowType = (RowType) dataType.getLogicalType();
List<String> columnNames = new ArrayList<>(rowType.getFieldCount());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index e0f5faf7ed26..09ebc52a036c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -32,10 +32,12 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
+import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.hudi.util.VectorConversionUtils;
import org.apache.flink.table.api.DataTypes;
@@ -55,7 +57,7 @@ import java.util.Set;
/**
* Implementation of {@link HoodieFileReader} to read {@link RowData}s from
base file.
*/
-public class HoodieRowDataParquetReader implements HoodieFileReader<RowData> {
+public class HoodieRowDataParquetReader implements HoodieRowDataFileReader {
private final HoodieStorage storage;
private final ParquetUtils parquetUtils;
private final StoragePath path;
@@ -98,6 +100,19 @@ public class HoodieRowDataParquetReader implements
HoodieFileReader<RowData> {
return new CloseableMappingIterator<>(rowDataItr, rowData ->
Objects.toString(rowData.getString(0)));
}
+ @Override
+ public ClosableIterator<RowData> getRowDataIterator(
+ HoodieSchema dataSchema,
+ HoodieSchema requiredSchema,
+ InternalSchemaManager internalSchemaManager,
+ List<ExpressionPredicates.Predicate> predicates) throws IOException {
+ return getRowDataIterator(
+ internalSchemaManager,
+ RowDataQueryContexts.fromSchema(dataSchema).getRowType(),
+ requiredSchema,
+ predicates);
+ }
+
public ClosableIterator<RowData> getRowDataIterator(
InternalSchemaManager internalSchemaManager,
DataType dataType,