This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 114f308280c05ef5e67247684d884faacdb1af4f Author: Shuo Cheng <[email protected]> AuthorDate: Tue Nov 4 09:48:23 2025 +0800 fix: Fix predicates for base file reader in Flink FileGroup reader (#14197) --- .../apache/hudi/source/ExpressionPredicates.java | 31 ++++ .../table/format/FlinkRowDataReaderContext.java | 49 +++++- .../hudi/source/TestExpressionPredicates.java | 165 +++++++++++++++++++++ .../apache/hudi/table/ITTestHoodieDataSource.java | 42 ++++++ 4 files changed, 281 insertions(+), 6 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java index 641196a66b64..32ae005f0566 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -177,6 +178,11 @@ public class ExpressionPredicates { * @return A filter predicate of parquet file. */ FilterPredicate filter(); + + /** + * List of columns that are referenced by this filter. + */ + List<String> references(); } /** @@ -231,6 +237,11 @@ public class ExpressionPredicates { return toParquetPredicate(getFunctionDefinition(), literalType, columnName, convertedLiteral); } + @Override + public List<String> references() { + return Collections.singletonList(columnName); + } + /** * Returns function definition of predicate. * @@ -485,6 +496,11 @@ public class ExpressionPredicates { public FilterPredicate filter() { return null; } + + @Override + public List<String> references() { + return Collections.emptyList(); + } } /** @@ -523,6 +539,11 @@ public class ExpressionPredicates { return not(filterPredicate); } + @Override + public List<String> references() { + return predicate.references(); + } + @Override public String toString() { return "NOT(" + predicate.toString() + ")"; @@ -566,6 +587,11 @@ public class ExpressionPredicates { return and(filterPredicate0, filterPredicate1); } + @Override + public List<String> references() { + return Arrays.stream(predicates).map(Predicate::references).flatMap(List::stream).distinct().collect(Collectors.toList()); + } + @Override public String toString() { return "AND(" + Arrays.toString(predicates) + ")"; @@ -609,6 +635,11 @@ public class ExpressionPredicates { return or(filterPredicate0, filterPredicate1); } + @Override + public List<String> references() { + return Arrays.stream(predicates).map(Predicate::references).flatMap(List::stream).distinct().collect(Collectors.toList()); + } + @Override public String toString() { return "OR(" + Arrays.toString(predicates) + ")"; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java index 3d2272d9d9fc..5e1f646104b7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.client.model.BootstrapRowData; import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger; import org.apache.hudi.client.model.EventTimeFlinkRecordMerger; @@ -43,6 +44,7 @@ import org.apache.hudi.source.ExpressionPredicates; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.util.Lazy; import org.apache.hudi.util.RowDataAvroQueryContexts; import org.apache.hudi.util.RecordKeyToRowDataConverter; @@ -54,32 +56,38 @@ import org.apache.flink.table.types.logical.RowType; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY; +import static org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME; /** * Implementation of {@link HoodieReaderContext} to read {@link RowData}s from base files or * log files with Flink parquet reader. */ public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> { - private final List<ExpressionPredicates.Predicate> predicates; + private final List<ExpressionPredicates.Predicate> allPredicates; + private final Lazy<List<ExpressionPredicates.Predicate>> lazyBootstrapSafeFilters; + private final Lazy<List<ExpressionPredicates.Predicate>> lazyMorSafeFilters; + private final Lazy<List<String>> lazyRecordKeys; private final Supplier<InternalSchemaManager> internalSchemaManager; - private final HoodieTableConfig tableConfig; public FlinkRowDataReaderContext( StorageConfiguration<?> storageConfiguration, Supplier<InternalSchemaManager> internalSchemaManager, - List<ExpressionPredicates.Predicate> predicates, + List<ExpressionPredicates.Predicate> allPredicates, HoodieTableConfig tableConfig, Option<InstantRange> instantRangeOpt) { super(storageConfiguration, tableConfig, instantRangeOpt, Option.empty(), new FlinkRecordContext(tableConfig, storageConfiguration)); - this.tableConfig = tableConfig; this.internalSchemaManager = internalSchemaManager; - this.predicates = predicates; + this.allPredicates = allPredicates; + this.lazyBootstrapSafeFilters = Lazy.lazily(() -> allPredicates.stream().filter(this::filterIsSafeForBootstrap).collect(Collectors.toList())); + this.lazyMorSafeFilters = Lazy.lazily(() -> allPredicates.stream().filter(this::filterIsSafeForMorMerging).collect(Collectors.toList())); + this.lazyRecordKeys = Lazy.lazily(() -> tableConfig.getRecordKeyFields().map(keys -> Arrays.stream(keys).collect(Collectors.toList())).orElse(Collections.emptyList())); } @Override @@ -99,7 +107,7 @@ public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> { .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK) .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, Option.empty()); DataType rowType = RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType(); - return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema, predicates); + return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema, getSafePredicates(requiredSchema)); } @Override @@ -198,4 +206,33 @@ public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> { pkFieldsPos, (RowType) RowDataAvroQueryContexts.fromAvroSchema(requiredSchema).getRowType().getLogicalType()); ((FlinkRecordContext) recordContext).setRecordKeyRowConverter(recordKeyRowConverter); } + + private List<ExpressionPredicates.Predicate> getSafePredicates(Schema requiredSchema) { + boolean hasRowIndexField = AvroSchemaUtils.containsFieldInSchema(requiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME); + if (!getHasLogFiles() && !getNeedsBootstrapMerge()) { + return allPredicates; + } else if (!getHasLogFiles() && hasRowIndexField) { + return lazyBootstrapSafeFilters.get(); + } else { + return lazyMorSafeFilters.get(); + } + } + + /** + * Only valid if there is support for RowIndexField and no log files + * Filters are safe for bootstrap if meta col filters are independent from data col filters. + */ + private boolean filterIsSafeForBootstrap(ExpressionPredicates.Predicate predicate) { + long metaRefCount = predicate.references().stream().filter(c -> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(c.toLowerCase())).count(); + return metaRefCount == predicate.references().size() || metaRefCount == 0; + } + + /** + * Only valid if the filter's references only include primary key columns or {@link HoodieRecord#RECORD_KEY_METADATA_FIELD}, + * because it's necessary to ensure both records with the same record key in the base file and log file are either filtered out + * or retained, to make the later mering process correct. + */ + private boolean filterIsSafeForMorMerging(ExpressionPredicates.Predicate predicate) { + return predicate.references().stream().allMatch(c -> lazyRecordKeys.get().contains(c.toLowerCase()) || c.equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java index 1ea00104b4d3..55698ae8c048 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java @@ -69,6 +69,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.or; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for {@link ExpressionPredicates}. @@ -243,4 +244,168 @@ public class TestExpressionPredicates { ExpressionPredicates.ColumnPredicate predicate = Equals.getInstance().bindFieldReference(fieldReference).bindValueLiteral(valueLiteral); assertDoesNotThrow(predicate::filter, () -> String.format("Convert from %s to %s failed", literalValue.getClass().getName(), dataType)); } + + @Test + public void testUnaryPredicateReferences() { + // Test Equals predicate references method + FieldReferenceExpression fieldReference = new FieldReferenceExpression("field1", DataTypes.STRING(), 0, 0); + ValueLiteralExpression valueLiteral = new ValueLiteralExpression("value1"); + + ExpressionPredicates.ColumnPredicate equalsPredicate = Equals.getInstance() + .bindFieldReference(fieldReference) + .bindValueLiteral(valueLiteral); + + assertEquals(Collections.singletonList("field1"), equalsPredicate.references()); + + // Test GreaterThan predicate references method + valueLiteral = new ValueLiteralExpression(10); + ExpressionPredicates.ColumnPredicate gtPredicate = GreaterThan.getInstance() + .bindFieldReference(fieldReference) + .bindValueLiteral(valueLiteral); + assertEquals(Collections.singletonList("field1"), gtPredicate.references()); + + ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression("value1"); + ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("value2"); + + ExpressionPredicates.ColumnPredicate inPredicate = In.getInstance() + .bindValueLiterals(Arrays.asList(valueLiteral1, valueLiteral2)) + .bindFieldReference(fieldReference); + + assertEquals(Collections.singletonList("field1"), inPredicate.references()); + } + + @Test + public void testNotPredicateReferences() { + // Test Not predicate references method - should delegate to underlying predicate + FieldReferenceExpression fieldReference = new FieldReferenceExpression("field4", DataTypes.STRING(), 0, 0); + ValueLiteralExpression valueLiteral = new ValueLiteralExpression("value1"); + + ExpressionPredicates.ColumnPredicate innerPredicate = Equals.getInstance() + .bindFieldReference(fieldReference) + .bindValueLiteral(valueLiteral); + + Predicate notPredicate = Not.getInstance().bindPredicate(innerPredicate); + + List<String> references = notPredicate.references(); + assertEquals(1, references.size()); + assertEquals("field4", references.get(0)); + } + + @Test + public void testAndPredicateReferencesWithSameColumn() { + // Test And predicate references method with same column + FieldReferenceExpression fieldReference1 = new FieldReferenceExpression("field1", DataTypes.INT(), 0, 0); + FieldReferenceExpression fieldReference2 = new FieldReferenceExpression("field1", DataTypes.INT(), 0, 0); + ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10); + ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(20); + + ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance() + .bindFieldReference(fieldReference1) + .bindValueLiteral(valueLiteral1); + + ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance() + .bindFieldReference(fieldReference2) + .bindValueLiteral(valueLiteral2); + + Predicate andPredicate = And.getInstance().bindPredicates(predicate1, predicate2); + + List<String> references = andPredicate.references(); + assertEquals(1, references.size()); + assertEquals("field1", references.get(0)); + } + + @Test + public void testAndPredicateReferencesWithDifferentColumns() { + // Test And predicate references method with different columns + FieldReferenceExpression fieldReference1 = new FieldReferenceExpression("field1", DataTypes.INT(), 0, 0); + FieldReferenceExpression fieldReference2 = new FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0); + ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10); + ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test"); + + ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance() + .bindFieldReference(fieldReference1) + .bindValueLiteral(valueLiteral1); + + ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance() + .bindFieldReference(fieldReference2) + .bindValueLiteral(valueLiteral2); + + Predicate andPredicate = And.getInstance().bindPredicates(predicate1, predicate2); + + assertEquals(Arrays.asList("field1", "field2"), andPredicate.references()); + } + + @Test + public void testOrPredicateReferencesWithSameColumn() { + // Test Or predicate references method with same column + FieldReferenceExpression fieldReference1 = new FieldReferenceExpression("field1", DataTypes.INT(), 0, 0); + FieldReferenceExpression fieldReference2 = new FieldReferenceExpression("field1", DataTypes.INT(), 0, 0); + ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10); + ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(20); + + ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance() + .bindFieldReference(fieldReference1) + .bindValueLiteral(valueLiteral1); + + ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance() + .bindFieldReference(fieldReference2) + .bindValueLiteral(valueLiteral2); + + Predicate orPredicate = Or.getInstance().bindPredicates(predicate1, predicate2); + + List<String> references = orPredicate.references(); + assertEquals(1, references.size()); + assertEquals("field1", references.get(0)); + } + + @Test + public void testOrPredicateReferencesWithDifferentColumns() { + // Test Or predicate references method with different columns + FieldReferenceExpression fieldReference1 = new FieldReferenceExpression("field1", DataTypes.INT(), 0, 0); + FieldReferenceExpression fieldReference2 = new FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0); + ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10); + ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test"); + + ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance() + .bindFieldReference(fieldReference1) + .bindValueLiteral(valueLiteral1); + + ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance() + .bindFieldReference(fieldReference2) + .bindValueLiteral(valueLiteral2); + + Predicate orPredicate = Or.getInstance().bindPredicates(predicate1, predicate2); + assertEquals(Arrays.asList("field1", "field2"), orPredicate.references()); + } + + @Test + public void testAlwaysNullPredicateReferences() { + // Test AlwaysNull predicate references method - should return empty list + Predicate alwaysNullPredicate = ExpressionPredicates.AlwaysNull.getInstance(); + + List<String> references = alwaysNullPredicate.references(); + assertEquals(0, references.size()); + assertTrue(references.isEmpty()); + } + + @Test + public void testNestedPredicateReferences() { + // Test nested predicates (AND inside NOT) + FieldReferenceExpression fieldReference1 = new FieldReferenceExpression("field1", DataTypes.INT(), 0, 0); + FieldReferenceExpression fieldReference2 = new FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0); + ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10); + ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test"); + + ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance() + .bindFieldReference(fieldReference1) + .bindValueLiteral(valueLiteral1); + + ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance() + .bindFieldReference(fieldReference2) + .bindValueLiteral(valueLiteral2); + + Predicate andPredicate = And.getInstance().bindPredicates(predicate1, predicate2); + Predicate notPredicate = Not.getInstance().bindPredicate(andPredicate); + assertEquals(Arrays.asList("field1", "field2"), notPredicate.references()); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index b59b51c035c0..d8f14601ed85 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1975,6 +1975,48 @@ public class ITTestHoodieDataSource { assertRowsEquals(result1, "[+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1]]"); } + @Test + void testPredicateForBaseFileWithMor() { + // Case: + // * records in base file can not survive from the predicate + // * records in log file can survive from the predicate + // * records in base file have higher ordering value + // E.g., base file: (uuid:'k1', age: 23, ts: 1003) + // log file: (uuid: 'k1', age: 25, ts: 1001) + // query filter: age = 25; + // Then the expected result should be empty, but if predicate age = 25 is pushed down + // into the parquet reader, the result would be wrong as (uuid: 'k1', age: 25, ts: 1001) + TableEnvironment tableEnv = batchTableEnv; + String path = tempFile.getAbsolutePath(); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, path) + .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) + .option(FlinkOptions.COMPACTION_TASKS, 1) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + final String INSERT_T1 = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 01:00:01','par1')\n"; + execInsertSql(tableEnv, INSERT_T1); + + batchTableEnv.executeSql("drop table t1"); + + hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, path) + .option(FlinkOptions.METADATA_ENABLED, true) + .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + final String INSERT_T2 = "insert into t1 values\n" + + "('id1','Danny',25,TIMESTAMP '1970-01-01 00:00:01','par1')\n"; + execInsertSql(tableEnv, INSERT_T2); + List<Row> result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where age = 25 and `partition` = 'par1'").execute().collect()); + assertRowsEquals(result1, "[]"); + } + @Test void testParquetLogBlockDataSkipping() { TableEnvironment tableEnv = batchTableEnv;
