This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 114f308280c05ef5e67247684d884faacdb1af4f
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Nov 4 09:48:23 2025 +0800

    fix: Fix predicates for base file reader in Flink FileGroup reader (#14197)
---
 .../apache/hudi/source/ExpressionPredicates.java   |  31 ++++
 .../table/format/FlinkRowDataReaderContext.java    |  49 +++++-
 .../hudi/source/TestExpressionPredicates.java      | 165 +++++++++++++++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  42 ++++++
 4 files changed, 281 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
index 641196a66b64..32ae005f0566 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -177,6 +178,11 @@ public class ExpressionPredicates {
      * @return A filter predicate of parquet file.
      */
     FilterPredicate filter();
+
+    /**
+     * List of columns that are referenced by this filter.
+     */
+    List<String> references();
   }
 
   /**
@@ -231,6 +237,11 @@ public class ExpressionPredicates {
       return toParquetPredicate(getFunctionDefinition(), literalType, 
columnName, convertedLiteral);
     }
 
+    @Override
+    public List<String> references() {
+      return Collections.singletonList(columnName);
+    }
+
     /**
      * Returns function definition of predicate.
      *
@@ -485,6 +496,11 @@ public class ExpressionPredicates {
     public FilterPredicate filter() {
       return null;
     }
+
+    @Override
+    public List<String> references() {
+      return Collections.emptyList();
+    }
   }
 
   /**
@@ -523,6 +539,11 @@ public class ExpressionPredicates {
       return not(filterPredicate);
     }
 
+    @Override
+    public List<String> references() {
+      return predicate.references();
+    }
+
     @Override
     public String toString() {
       return "NOT(" + predicate.toString() + ")";
@@ -566,6 +587,11 @@ public class ExpressionPredicates {
       return and(filterPredicate0, filterPredicate1);
     }
 
+    @Override
+    public List<String> references() {
+      return 
Arrays.stream(predicates).map(Predicate::references).flatMap(List::stream).distinct().collect(Collectors.toList());
+    }
+
     @Override
     public String toString() {
       return "AND(" + Arrays.toString(predicates) + ")";
@@ -609,6 +635,11 @@ public class ExpressionPredicates {
       return or(filterPredicate0, filterPredicate1);
     }
 
+    @Override
+    public List<String> references() {
+      return 
Arrays.stream(predicates).map(Predicate::references).flatMap(List::stream).distinct().collect(Collectors.toList());
+    }
+
     @Override
     public String toString() {
       return "OR(" + Arrays.toString(predicates) + ")";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 3d2272d9d9fc..5e1f646104b7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.model.BootstrapRowData;
 import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
@@ -43,6 +44,7 @@ import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.util.Lazy;
 import org.apache.hudi.util.RowDataAvroQueryContexts;
 import org.apache.hudi.util.RecordKeyToRowDataConverter;
 
@@ -54,32 +56,38 @@ import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static 
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
 
 /**
  * Implementation of {@link HoodieReaderContext} to read {@link RowData}s from 
base files or
  * log files with Flink parquet reader.
  */
 public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
-  private final List<ExpressionPredicates.Predicate> predicates;
+  private final List<ExpressionPredicates.Predicate> allPredicates;
+  private final Lazy<List<ExpressionPredicates.Predicate>> 
lazyBootstrapSafeFilters;
+  private final Lazy<List<ExpressionPredicates.Predicate>> lazyMorSafeFilters;
+  private final Lazy<List<String>> lazyRecordKeys;
   private final Supplier<InternalSchemaManager> internalSchemaManager;
-  private final HoodieTableConfig tableConfig;
 
   public FlinkRowDataReaderContext(
       StorageConfiguration<?> storageConfiguration,
       Supplier<InternalSchemaManager> internalSchemaManager,
-      List<ExpressionPredicates.Predicate> predicates,
+      List<ExpressionPredicates.Predicate> allPredicates,
       HoodieTableConfig tableConfig,
       Option<InstantRange> instantRangeOpt) {
     super(storageConfiguration, tableConfig, instantRangeOpt, Option.empty(), 
new FlinkRecordContext(tableConfig, storageConfiguration));
-    this.tableConfig = tableConfig;
     this.internalSchemaManager = internalSchemaManager;
-    this.predicates = predicates;
+    this.allPredicates = allPredicates;
+    this.lazyBootstrapSafeFilters = Lazy.lazily(() -> 
allPredicates.stream().filter(this::filterIsSafeForBootstrap).collect(Collectors.toList()));
+    this.lazyMorSafeFilters = Lazy.lazily(() -> 
allPredicates.stream().filter(this::filterIsSafeForMorMerging).collect(Collectors.toList()));
+    this.lazyRecordKeys = Lazy.lazily(() -> 
tableConfig.getRecordKeyFields().map(keys -> 
Arrays.stream(keys).collect(Collectors.toList())).orElse(Collections.emptyList()));
   }
 
   @Override
@@ -99,7 +107,7 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
             .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
             .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, 
Option.empty());
     DataType rowType = 
RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
-    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema, predicates);
+    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema, getSafePredicates(requiredSchema));
   }
 
   @Override
@@ -198,4 +206,33 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
         pkFieldsPos, (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(requiredSchema).getRowType().getLogicalType());
     ((FlinkRecordContext) 
recordContext).setRecordKeyRowConverter(recordKeyRowConverter);
   }
+
+  private List<ExpressionPredicates.Predicate> getSafePredicates(Schema 
requiredSchema) {
+    boolean hasRowIndexField = 
AvroSchemaUtils.containsFieldInSchema(requiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME);
+    if (!getHasLogFiles() && !getNeedsBootstrapMerge()) {
+      return allPredicates;
+    } else if (!getHasLogFiles() && hasRowIndexField) {
+      return lazyBootstrapSafeFilters.get();
+    } else {
+      return lazyMorSafeFilters.get();
+    }
+  }
+
+  /**
+   * Only valid if there is support for RowIndexField and no log files
+   * Filters are safe for bootstrap if meta col filters are independent from 
data col filters.
+   */
+  private boolean filterIsSafeForBootstrap(ExpressionPredicates.Predicate 
predicate) {
+    long metaRefCount = predicate.references().stream().filter(c -> 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(c.toLowerCase())).count();
+    return metaRefCount == predicate.references().size() || metaRefCount == 0;
+  }
+
+  /**
+   * Only valid if the filter's references only include primary key columns or 
{@link HoodieRecord#RECORD_KEY_METADATA_FIELD},
+   * because it's necessary to ensure both records with the same record key in 
the base file and log file are either filtered out
+   * or retained, to make the later mering process correct.
+   */
+  private boolean filterIsSafeForMorMerging(ExpressionPredicates.Predicate 
predicate) {
+    return predicate.references().stream().allMatch(c -> 
lazyRecordKeys.get().contains(c.toLowerCase()) || 
c.equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
index 1ea00104b4d3..55698ae8c048 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
@@ -69,6 +69,7 @@ import static 
org.apache.parquet.filter2.predicate.FilterApi.or;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for {@link ExpressionPredicates}.
@@ -243,4 +244,168 @@ public class TestExpressionPredicates {
     ExpressionPredicates.ColumnPredicate predicate = 
Equals.getInstance().bindFieldReference(fieldReference).bindValueLiteral(valueLiteral);
     assertDoesNotThrow(predicate::filter, () -> String.format("Convert from %s 
to %s failed", literalValue.getClass().getName(), dataType));
   }
+
+  @Test
+  public void testUnaryPredicateReferences() {
+    // Test Equals predicate references method
+    FieldReferenceExpression fieldReference = new 
FieldReferenceExpression("field1", DataTypes.STRING(), 0, 0);
+    ValueLiteralExpression valueLiteral = new ValueLiteralExpression("value1");
+    
+    ExpressionPredicates.ColumnPredicate equalsPredicate = Equals.getInstance()
+        .bindFieldReference(fieldReference)
+        .bindValueLiteral(valueLiteral);
+    
+    assertEquals(Collections.singletonList("field1"), 
equalsPredicate.references());
+
+    // Test GreaterThan predicate references method
+    valueLiteral = new ValueLiteralExpression(10);
+    ExpressionPredicates.ColumnPredicate gtPredicate = 
GreaterThan.getInstance()
+        .bindFieldReference(fieldReference)
+        .bindValueLiteral(valueLiteral);
+    assertEquals(Collections.singletonList("field1"), 
gtPredicate.references());
+
+    ValueLiteralExpression valueLiteral1 = new 
ValueLiteralExpression("value1");
+    ValueLiteralExpression valueLiteral2 = new 
ValueLiteralExpression("value2");
+
+    ExpressionPredicates.ColumnPredicate inPredicate = In.getInstance()
+        .bindValueLiterals(Arrays.asList(valueLiteral1, valueLiteral2))
+        .bindFieldReference(fieldReference);
+
+    assertEquals(Collections.singletonList("field1"), 
inPredicate.references());
+  }
+
+  @Test
+  public void testNotPredicateReferences() {
+    // Test Not predicate references method - should delegate to underlying 
predicate
+    FieldReferenceExpression fieldReference = new 
FieldReferenceExpression("field4", DataTypes.STRING(), 0, 0);
+    ValueLiteralExpression valueLiteral = new ValueLiteralExpression("value1");
+    
+    ExpressionPredicates.ColumnPredicate innerPredicate = Equals.getInstance()
+        .bindFieldReference(fieldReference)
+        .bindValueLiteral(valueLiteral);
+    
+    Predicate notPredicate = Not.getInstance().bindPredicate(innerPredicate);
+    
+    List<String> references = notPredicate.references();
+    assertEquals(1, references.size());
+    assertEquals("field4", references.get(0));
+  }
+
+  @Test
+  public void testAndPredicateReferencesWithSameColumn() {
+    // Test And predicate references method with same column
+    FieldReferenceExpression fieldReference1 = new 
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+    FieldReferenceExpression fieldReference2 = new 
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+    ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+    ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(20);
+    
+    ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+        .bindFieldReference(fieldReference1)
+        .bindValueLiteral(valueLiteral1);
+    
+    ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+        .bindFieldReference(fieldReference2)
+        .bindValueLiteral(valueLiteral2);
+    
+    Predicate andPredicate = And.getInstance().bindPredicates(predicate1, 
predicate2);
+    
+    List<String> references = andPredicate.references();
+    assertEquals(1, references.size());
+    assertEquals("field1", references.get(0));
+  }
+
+  @Test
+  public void testAndPredicateReferencesWithDifferentColumns() {
+    // Test And predicate references method with different columns
+    FieldReferenceExpression fieldReference1 = new 
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+    FieldReferenceExpression fieldReference2 = new 
FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0);
+    ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+    ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test");
+    
+    ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+        .bindFieldReference(fieldReference1)
+        .bindValueLiteral(valueLiteral1);
+    
+    ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+        .bindFieldReference(fieldReference2)
+        .bindValueLiteral(valueLiteral2);
+    
+    Predicate andPredicate = And.getInstance().bindPredicates(predicate1, 
predicate2);
+    
+    assertEquals(Arrays.asList("field1", "field2"), andPredicate.references());
+  }
+
+  @Test
+  public void testOrPredicateReferencesWithSameColumn() {
+    // Test Or predicate references method with same column
+    FieldReferenceExpression fieldReference1 = new 
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+    FieldReferenceExpression fieldReference2 = new 
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+    ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+    ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(20);
+    
+    ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+        .bindFieldReference(fieldReference1)
+        .bindValueLiteral(valueLiteral1);
+    
+    ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+        .bindFieldReference(fieldReference2)
+        .bindValueLiteral(valueLiteral2);
+    
+    Predicate orPredicate = Or.getInstance().bindPredicates(predicate1, 
predicate2);
+    
+    List<String> references = orPredicate.references();
+    assertEquals(1, references.size());
+    assertEquals("field1", references.get(0));
+  }
+
+  @Test
+  public void testOrPredicateReferencesWithDifferentColumns() {
+    // Test Or predicate references method with different columns
+    FieldReferenceExpression fieldReference1 = new 
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+    FieldReferenceExpression fieldReference2 = new 
FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0);
+    ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+    ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test");
+    
+    ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+        .bindFieldReference(fieldReference1)
+        .bindValueLiteral(valueLiteral1);
+    
+    ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+        .bindFieldReference(fieldReference2)
+        .bindValueLiteral(valueLiteral2);
+    
+    Predicate orPredicate = Or.getInstance().bindPredicates(predicate1, 
predicate2);
+    assertEquals(Arrays.asList("field1", "field2"), orPredicate.references());
+  }
+
+  @Test
+  public void testAlwaysNullPredicateReferences() {
+    // Test AlwaysNull predicate references method - should return empty list
+    Predicate alwaysNullPredicate = 
ExpressionPredicates.AlwaysNull.getInstance();
+    
+    List<String> references = alwaysNullPredicate.references();
+    assertEquals(0, references.size());
+    assertTrue(references.isEmpty());
+  }
+
+  @Test
+  public void testNestedPredicateReferences() {
+    // Test nested predicates (AND inside NOT)
+    FieldReferenceExpression fieldReference1 = new 
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+    FieldReferenceExpression fieldReference2 = new 
FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0);
+    ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+    ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test");
+    
+    ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+        .bindFieldReference(fieldReference1)
+        .bindValueLiteral(valueLiteral1);
+    
+    ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+        .bindFieldReference(fieldReference2)
+        .bindValueLiteral(valueLiteral2);
+    
+    Predicate andPredicate = And.getInstance().bindPredicates(predicate1, 
predicate2);
+    Predicate notPredicate = Not.getInstance().bindPredicate(andPredicate);
+    assertEquals(Arrays.asList("field1", "field2"), notPredicate.references());
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index b59b51c035c0..d8f14601ed85 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1975,6 +1975,48 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result1, "[+I[id2, Stephen, 33, 1970-01-01T00:00:02, 
par1]]");
   }
 
+  @Test
+  void testPredicateForBaseFileWithMor() {
+    // Case:
+    // * records in base file can not survive from the predicate
+    // * records in log file can survive from the predicate
+    // * records in base file have higher ordering value
+    // E.g., base file: (uuid:'k1', age: 23, ts: 1003)
+    // log file: (uuid: 'k1', age: 25, ts: 1001)
+    // query filter: age = 25;
+    // Then the expected result should be empty, but if predicate age = 25 is 
pushed down
+    // into the parquet reader, the result would be wrong as (uuid: 'k1', age: 
25, ts: 1001)
+    TableEnvironment tableEnv = batchTableEnv;
+    String path = tempFile.getAbsolutePath();
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, path)
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
+        .option(FlinkOptions.COMPACTION_TASKS, 1)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    final String INSERT_T1 = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 01:00:01','par1')\n";
+    execInsertSql(tableEnv, INSERT_T1);
+
+    batchTableEnv.executeSql("drop table t1");
+
+    hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, path)
+        .option(FlinkOptions.METADATA_ENABLED, true)
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    final String INSERT_T2 = "insert into t1 values\n"
+        + "('id1','Danny',25,TIMESTAMP '1970-01-01 00:00:01','par1')\n";
+    execInsertSql(tableEnv, INSERT_T2);
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1 where age = 25 and 
`partition` = 'par1'").execute().collect());
+    assertRowsEquals(result1, "[]");
+  }
+
   @Test
   void testParquetLogBlockDataSkipping() {
     TableEnvironment tableEnv = batchTableEnv;

Reply via email to