This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a62a144a5e9 [FLINK-39421][table] Fix metadata filter contract and add 
coverage
a62a144a5e9 is described below

commit a62a144a5e99ef1d66007f2caf409a62fdbcb5ad
Author: Jim Hughes <[email protected]>
AuthorDate: Fri Jun 5 11:28:50 2026 -0400

    [FLINK-39421][table] Fix metadata filter contract and add coverage
    
    PushFilterIntoTableSourceScanRule now classifies each input predicate
    by identity-Set membership in the source's accepted/remaining lists,
    allowing arbitrary non-contiguous subsets. An input absent from both
    raises a TableException.
    
    Metadata-only predicates the rule cannot push (source does not support
    it or the spec is already attached) now stay as a runtime Calc; the
    prior physical route created a FilterPushDownSpec that crashed
    compiled-plan restore once ProjectPushDownSpec narrowed the row type.
    
    MetadataFilterPushDownSpec enforces the same identity round-trip on
    restore; needAdjustFieldReferenceAfterProjection() returns false so
    ScanReuser can share a scan across queries with the same metadata
    filter but different projections.
    
    Tests cover the four MetadataFilterResult shapes end-to-end, the
    coverage invariant, and JSON serde of the spec.
---
 .../plan/abilities/source/FilterPushDownSpec.java  |  21 +-
 .../source/MetadataFilterPushDownSpec.java         |  59 +++-
 .../logical/PushFilterIntoSourceScanRuleBase.java  |  79 ++++--
 .../logical/PushFilterIntoTableSourceScanRule.java |  52 ++--
 .../planner/factories/TestValuesTableFactory.java  |   1 +
 .../MetadataFilterInReadingMetadataTest.java       | 264 +++++++++++-------
 .../logical/MetadataFilterResultShapesITCase.java  | 305 +++++++++++++++++++++
 .../MetadataFilterInReadingMetadataTest.xml        |  52 +++-
 8 files changed, 660 insertions(+), 173 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index 7a4e1481e04..108329cea04 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -63,7 +63,7 @@ public final class FilterPushDownSpec extends 
SourceAbilitySpecBase {
     /**
      * A flag which indicates all predicates are retained in the outer Filter 
operator.
      *
-     * <p>This flog is only used for optimization phase, and should not be 
serialized.
+     * <p>This flag is only used for optimization phase, and should not be 
serialized.
      */
     @JsonIgnore private final boolean allPredicatesRetained;
 
@@ -111,8 +111,9 @@ public final class FilterPushDownSpec extends 
SourceAbilitySpecBase {
     }
 
     /**
-     * Converts {@link RexNode} predicates to {@link ResolvedExpression}s 
using the given row type.
-     * Shared between physical and metadata filter push-down paths.
+     * Converts {@link RexNode} predicates to {@link ResolvedExpression}s, 
preserving 1:1 input
+     * order so callers can correlate by position. Shared between physical and 
metadata filter
+     * push-down paths.
      */
     static List<ResolvedExpression> resolvePredicates(
             List<RexNode> predicates,
@@ -137,9 +138,9 @@ public final class FilterPushDownSpec extends 
SourceAbilitySpecBase {
                                         throw new TableException(
                                                 String.format(
                                                         "%s can not be 
converted to Expression, please make sure %s can accept %s.",
-                                                        p.toString(),
+                                                        p,
                                                         
tableSource.getClass().getSimpleName(),
-                                                        p.toString()));
+                                                        p));
                                     }
                                 })
                         .collect(Collectors.toList());
@@ -160,7 +161,15 @@ public final class FilterPushDownSpec extends 
SourceAbilitySpecBase {
                                             "SQL expression parsing is not 
supported at this location.");
                                 })
                         .build();
-        return resolver.resolve(filters);
+        List<ResolvedExpression> resolved = resolver.resolve(filters);
+        if (resolved.size() != predicates.size()) {
+            throw new TableException(
+                    String.format(
+                            "Internal error: ExpressionResolver returned %d 
resolved expressions "
+                                    + "for %d input predicates.",
+                            resolved.size(), predicates.size()));
+        }
+        return resolved;
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
index 090ab45fe27..d1cb3303b80 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -37,6 +38,7 @@ import org.apache.calcite.rex.RexNode;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -78,23 +80,57 @@ public final class MetadataFilterPushDownSpec extends 
SourceAbilitySpecBase {
 
     @Override
     public void apply(DynamicTableSource tableSource, SourceAbilityContext 
context) {
-        // Use stored predicateRowType; context's row type may be narrowed by 
ProjectPushDownSpec.
-        MetadataFilterResult result =
-                applyMetadataFilters(predicates, predicateRowType, 
tableSource, context);
-        if (result.getAcceptedFilters().size() != predicates.size()) {
-            throw new TableException("All metadata predicates should be 
accepted here.");
+        List<ResolvedExpression> resolved =
+                resolvedExpressions(predicates, predicateRowType, tableSource, 
context);
+        MetadataFilterResult result = 
applyMetadataFiltersOnSource(tableSource, resolved);
+        // On restore every predicate must round-trip back via instance 
identity. `remaining`
+        // is not validated: the spec only stores already-accepted predicates, 
so the source
+        // should re-accept them all.
+        Set<ResolvedExpression> accepted = Sets.newIdentityHashSet();
+        accepted.addAll(result.getAcceptedFilters());
+        Set<ResolvedExpression> inputs = Sets.newIdentityHashSet();
+        inputs.addAll(resolved);
+        for (ResolvedExpression r : result.getAcceptedFilters()) {
+            if (!inputs.contains(r)) {
+                throw new TableException(
+                        "Source returned an accepted metadata filter not 
produced by the "
+                                + "planner. Sources must return back the same 
ResolvedExpression "
+                                + "instances they received.");
+            }
+        }
+        for (ResolvedExpression r : resolved) {
+            if (!accepted.contains(r)) {
+                throw new TableException(
+                        "All metadata predicates should be accepted on 
compiled-plan restore. "
+                                + "Source dropped a predicate that was 
accepted at optimization "
+                                + "time.");
+            }
         }
     }
 
     /**
-     * Converts RexNode predicates to ResolvedExpressions using the given row 
type and calls
-     * applyMetadataFilters on the source. The row type must already use 
metadata key names.
+     * Resolves predicates to {@link ResolvedExpression}s; the returned list 
preserves input order,
+     * so callers may correlate against the input list by position.
      */
-    public static MetadataFilterResult applyMetadataFilters(
+    public static List<ResolvedExpression> resolvedExpressions(
             List<RexNode> predicates,
             RowType metadataKeyRowType,
             DynamicTableSource tableSource,
             SourceAbilityContext context) {
+        ensureMetadataFilterPushDown(tableSource);
+        return FilterPushDownSpec.resolvePredicates(
+                predicates, metadataKeyRowType, tableSource, context);
+    }
+
+    /** Pushes already-resolved expressions to the source. */
+    public static MetadataFilterResult applyMetadataFiltersOnSource(
+            DynamicTableSource tableSource, List<ResolvedExpression> resolved) 
{
+        SupportsReadingMetadata readingMetadata = 
ensureMetadataFilterPushDown(tableSource);
+        return readingMetadata.applyMetadataFilters(resolved);
+    }
+
+    private static SupportsReadingMetadata ensureMetadataFilterPushDown(
+            DynamicTableSource tableSource) {
         if (!(tableSource instanceof SupportsReadingMetadata)) {
             throw new TableException(
                     String.format(
@@ -108,15 +144,12 @@ public final class MetadataFilterPushDownSpec extends 
SourceAbilitySpecBase {
                             "%s no longer supports metadata filter push-down.",
                             tableSource.getClass().getName()));
         }
-        List<ResolvedExpression> resolved =
-                FilterPushDownSpec.resolvePredicates(
-                        predicates, metadataKeyRowType, tableSource, context);
-        return readingMetadata.applyMetadataFilters(resolved);
+        return readingMetadata;
     }
 
     @Override
     public boolean needAdjustFieldReferenceAfterProjection() {
-        return true;
+        return false;
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index eb5521ef782..ef8bc092f98 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.table.planner.plan.rules.logical;
 
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -33,6 +34,9 @@ import 
org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.RowType.RowField;
 
+import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
+
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.rel.core.TableScan;
@@ -47,6 +51,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -176,36 +181,78 @@ public abstract class PushFilterIntoSourceScanRuleBase 
extends RelOptRule {
         return mapping;
     }
 
+    /** Outcome of metadata filter push-down: updated table and runtime-Calc 
predicates. */
+    protected static final class MetadataPushDownOutcome {
+        final TableSourceTable newTableSourceTable;
+        final List<RexNode> remainingInputRexNodes;
+
+        MetadataPushDownOutcome(
+                TableSourceTable newTableSourceTable, List<RexNode> 
remainingInputRexNodes) {
+            this.newTableSourceTable = newTableSourceTable;
+            this.remainingInputRexNodes = remainingInputRexNodes;
+        }
+    }
+
     /** Resolves metadata filters and creates a new {@link TableSourceTable}. 
*/
-    protected Tuple2<MetadataFilterResult, TableSourceTable>
-            resolveMetadataFiltersAndCreateTableSourceTable(
-                    RexNode[] metadataPredicates,
-                    TableSourceTable oldTableSourceTable,
-                    TableScan scan,
-                    RelBuilder relBuilder) {
+    protected MetadataPushDownOutcome 
resolveMetadataFiltersAndCreateTableSourceTable(
+            RexNode[] metadataPredicates, TableSourceTable 
oldTableSourceTable, TableScan scan) {
         DynamicTableSource newTableSource = 
oldTableSourceTable.tableSource().copy();
         SourceAbilityContext abilityContext = SourceAbilityContext.from(scan);
 
-        // Build a metadata-only row type (field names are metadata keys, not 
SQL aliases) and
-        // an old->new index mapping. Storing only metadata columns avoids 
name collisions with
-        // physical columns (e.g. `offset INT, msg_offset INT METADATA FROM 
'offset'`).
+        // Field names are metadata keys, not SQL aliases, to avoid 
physical/metadata collisions
+        // (e.g. `offset INT, msg_offset INT METADATA FROM 'offset'`).
         MetadataRowInfo metadataRowInfo =
                 buildMetadataRowInfo(oldTableSourceTable, 
abilityContext.getSourceRowType());
         RexNode[] remappedPredicates =
                 remapPredicates(metadataPredicates, 
metadataRowInfo.oldIndexToNewIndex);
 
-        MetadataFilterResult result =
-                MetadataFilterPushDownSpec.applyMetadataFilters(
+        List<ResolvedExpression> resolved =
+                MetadataFilterPushDownSpec.resolvedExpressions(
                         Arrays.asList(remappedPredicates),
                         metadataRowInfo.metadataRowType,
                         newTableSource,
                         abilityContext);
+        MetadataFilterResult result =
+                
MetadataFilterPushDownSpec.applyMetadataFiltersOnSource(newTableSource, 
resolved);
+
+        // Source must return back the same instances it received.
+        Set<ResolvedExpression> acceptedSet = Sets.newIdentityHashSet();
+        acceptedSet.addAll(result.getAcceptedFilters());
+        Set<ResolvedExpression> remainingSet = Sets.newIdentityHashSet();
+        remainingSet.addAll(result.getRemainingFilters());
+        Set<ResolvedExpression> inputSet = Sets.newIdentityHashSet();
+        inputSet.addAll(resolved);
+
+        for (ResolvedExpression r :
+                Iterables.concat(result.getAcceptedFilters(), 
result.getRemainingFilters())) {
+            if (!inputSet.contains(r)) {
+                throw new TableException(
+                        "Source returned a metadata filter not in the input 
list. Sources must "
+                                + "return back the same ResolvedExpression 
instances they "
+                                + "received from applyMetadataFilters.");
+            }
+        }
 
-        int acceptedCount = result.getAcceptedFilters().size();
         List<RexNode> acceptedRemappedPredicates = new ArrayList<>();
-        for (int i = 0; i < acceptedCount; i++) {
-            acceptedRemappedPredicates.add(remappedPredicates[i]);
+        List<RexNode> remainingInputRexNodes = new ArrayList<>();
+        for (int i = 0; i < resolved.size(); i++) {
+            ResolvedExpression r = resolved.get(i);
+            boolean inAccepted = acceptedSet.contains(r);
+            boolean inRemaining = remainingSet.contains(r);
+            if (!inAccepted && !inRemaining) {
+                throw new TableException(
+                        "Source dropped a metadata filter that was passed to "
+                                + "applyMetadataFilters. Every input predicate 
must appear in the "
+                                + "result's accepted list, remaining list, or 
both.");
+            }
+            if (inAccepted) {
+                acceptedRemappedPredicates.add(remappedPredicates[i]);
+            }
+            if (inRemaining) {
+                remainingInputRexNodes.add(metadataPredicates[i]);
+            }
         }
+
         MetadataFilterPushDownSpec metadataSpec =
                 new MetadataFilterPushDownSpec(
                         acceptedRemappedPredicates, 
metadataRowInfo.metadataRowType);
@@ -216,7 +263,7 @@ public abstract class PushFilterIntoSourceScanRuleBase 
extends RelOptRule {
                         oldTableSourceTable.getStatistic(),
                         new SourceAbilitySpec[] {metadataSpec});
 
-        return new Tuple2<>(result, newTableSourceTable);
+        return new MetadataPushDownOutcome(newTableSourceTable, 
remainingInputRexNodes);
     }
 
     /**
@@ -253,7 +300,7 @@ public abstract class PushFilterIntoSourceScanRuleBase 
extends RelOptRule {
                     public RexNode visitInputRef(RexInputRef inputRef) {
                         Integer newIdx = 
oldIndexToNewIndex.get(inputRef.getIndex());
                         if (newIdx == null) {
-                            throw new IllegalStateException(
+                            throw new TableException(
                                     "Metadata predicate references 
non-metadata column index "
                                             + inputRef.getIndex());
                         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index 57d80b1b2f9..4e896a30701 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.rules.logical;
 
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
-import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 
@@ -31,6 +30,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.RelBuilder;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import scala.Tuple2;
@@ -99,23 +99,31 @@ public class PushFilterIntoTableSourceScanRule extends 
PushFilterIntoSourceScanR
         boolean supportsMetadataFilter = 
canPushdownMetadataFilter(tableSourceTable);
         int physicalColumnCount = getPhysicalColumnCount(tableSourceTable);
 
-        // Classify predicates: only separate metadata predicates when the 
source
-        // actually supports metadata filter push-down. Otherwise, all 
predicates
-        // go through the physical path to preserve the FilterPushDownSpec 
guard
-        // that prevents rule re-firing and maintains scan reuse invariants.
+        List<RexNode> allRemainingRexNodes = new ArrayList<>();
+        TableSourceTable currentTable = tableSourceTable;
+
+        // Unpushable metadata predicates stay as a runtime Calc, not the 
physical path —
+        // physical routing produces a FilterPushDownSpec that crashes 
compiled-plan restore
+        // once ProjectPushDownSpec narrows the scan row type.
         List<RexNode> physicalPredicates = new ArrayList<>();
         List<RexNode> metadataPredicates = new ArrayList<>();
         for (RexNode predicate : convertiblePredicates) {
-            if (supportsMetadataFilter
-                    && referencesOnlyMetadataColumns(predicate, 
physicalColumnCount)) {
-                metadataPredicates.add(predicate);
+            if (referencesOnlyMetadataColumns(predicate, physicalColumnCount)) 
{
+                if (supportsMetadataFilter) {
+                    metadataPredicates.add(predicate);
+                } else {
+                    allRemainingRexNodes.add(predicate);
+                }
             } else {
                 physicalPredicates.add(predicate);
             }
         }
 
-        List<RexNode> allRemainingRexNodes = new ArrayList<>();
-        TableSourceTable currentTable = tableSourceTable;
+        // Avoid re-firing on shapes we can't transform — saves wasted Hep 
iterations.
+        boolean nothingToPushPhysically = physicalPredicates.isEmpty() || 
!supportsPhysicalFilter;
+        if (nothingToPushPhysically && metadataPredicates.isEmpty()) {
+            return;
+        }
 
         if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
             Tuple2<SupportsFilterPushDown.Result, TableSourceTable> 
physicalResult =
@@ -133,28 +141,14 @@ public class PushFilterIntoTableSourceScanRule extends 
PushFilterIntoSourceScanR
         }
 
         if (!metadataPredicates.isEmpty()) {
-            Tuple2<MetadataFilterResult, TableSourceTable> metadataResult =
+            MetadataPushDownOutcome metadataResult =
                     resolveMetadataFiltersAndCreateTableSourceTable(
-                            metadataPredicates.toArray(new RexNode[0]),
-                            currentTable,
-                            scan,
-                            relBuilder);
-            currentTable = metadataResult._2;
-            // Remaining (rejected) metadata predicates stay as a 
LogicalFilter above
-            // the scan so they are still evaluated at runtime. We use the 
original
-            // RexNodes (suffix) because the remaining ResolvedExpressions use 
metadata
-            // key names, not SQL aliases needed by the Filter's row type. The
-            // validation in resolveMetadataFiltersAndCreateTableSourceTable 
ensures
-            // the partition invariant (accepted prefix + remaining suffix = 
input).
-            int acceptedCount = metadataResult._1.getAcceptedFilters().size();
-            for (int i = acceptedCount; i < metadataPredicates.size(); i++) {
-                allRemainingRexNodes.add(metadataPredicates.get(i));
-            }
+                            metadataPredicates.toArray(new RexNode[0]), 
currentTable, scan);
+            currentTable = metadataResult.newTableSourceTable;
+            allRemainingRexNodes.addAll(metadataResult.remainingInputRexNodes);
         }
 
-        for (RexNode unconverted : unconvertedPredicates) {
-            allRemainingRexNodes.add(unconverted);
-        }
+        allRemainingRexNodes.addAll(Arrays.asList(unconvertedPredicates));
 
         LogicalTableScan newScan =
                 LogicalTableScan.create(scan.getCluster(), currentTable, 
scan.getHints());
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index d011687d82e..3387f522232 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -1847,6 +1847,7 @@ public final class TestValuesTableFactory
                             projectedMetadataFields,
                             enableAggregatePushDown);
             newSource.watermarkStrategy = watermarkStrategy;
+            
newSource.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
             return newSource;
         }
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
index ffd6aa0ea89..97245fbcbf1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.rules.logical;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableException;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
@@ -35,33 +36,28 @@ import 
org.apache.flink.table.planner.utils.BatchTableTestUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.planner.utils.TableTestBase;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.testutils.junit.SharedObjectsExtension;
-import org.apache.flink.testutils.junit.SharedReference;
 
 import org.apache.calcite.plan.hep.HepMatchOrder;
 import org.apache.calcite.rel.rules.CoreRules;
 import org.apache.calcite.tools.RuleSets;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for metadata filter push-down through {@link 
SupportsReadingMetadata}. */
 class MetadataFilterInReadingMetadataTest extends TableTestBase {
 
-    @RegisterExtension
-    private final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();
-
     private BatchTableTestUtil util;
 
     @BeforeEach
@@ -88,139 +84,144 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
 
     @Test
     void testMetadataFilterPushDown() {
-        SharedReference<List<ResolvedExpression>> receivedFilters =
-                sharedObjects.add(new ArrayList<>());
         TableDescriptor descriptor =
                 TableFactoryHarness.newBuilder()
                         .schema(MetadataFilterSource.SCHEMA)
-                        .source(new MetadataFilterSource(true, 
receivedFilters))
+                        .source(new MetadataFilterSource(true))
                         .build();
         util.tableEnv().createTable("T1", descriptor);
 
         util.verifyRelPlan("SELECT id FROM T1 WHERE event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
-
-        assertThat(receivedFilters.get().toString())
-                .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
     }
 
     @Test
     void testMetadataFilterNotPushedWhenNotSupported() {
-        SharedReference<List<ResolvedExpression>> receivedFilters =
-                sharedObjects.add(new ArrayList<>());
         TableDescriptor descriptor =
                 TableFactoryHarness.newBuilder()
                         .schema(MetadataFilterSource.SCHEMA)
-                        .source(new MetadataFilterSource(false, 
receivedFilters))
+                        .source(new MetadataFilterSource(false))
                         .build();
         util.tableEnv().createTable("T2", descriptor);
 
         util.verifyRelPlan("SELECT id FROM T2 WHERE event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
-
-        // No metadata filters should have been pushed
-        assertThat(receivedFilters.get()).isEmpty();
     }
 
     @Test
     void testAliasedMetadataColumnFilter() {
-        SharedReference<List<ResolvedExpression>> receivedFilters =
-                sharedObjects.add(new ArrayList<>());
         TableDescriptor descriptor =
                 TableFactoryHarness.newBuilder()
                         .schema(RenamedMetadataFilterSource.SCHEMA)
-                        .source(new 
RenamedMetadataFilterSource(receivedFilters))
+                        .source(new RenamedMetadataFilterSource())
                         .build();
         util.tableEnv().createTable("T3", descriptor);
 
-        // 'event_ts' is the SQL alias for metadata key 'timestamp'
+        // 'event_ts' is the SQL alias for metadata key 'timestamp'; the spec 
stores the metadata
+        // key so the plan shows `metadataFilter=[>(timestamp, ...)]` not the 
alias.
         util.verifyRelPlan("SELECT id FROM T3 WHERE event_ts > TIMESTAMP 
'2024-01-01 00:00:00'");
-
-        // The source should receive the filter with metadata key 'timestamp', 
not 'event_ts'.
-        assertThat(receivedFilters.get().toString())
-                .isEqualTo("[greaterThan(timestamp, 2024-01-01T00:00)]");
     }
 
     @Test
     void testMixedPhysicalAndMetadataFilters() {
-        SharedReference<List<ResolvedExpression>> metadataFilters =
-                sharedObjects.add(new ArrayList<>());
-        SharedReference<List<ResolvedExpression>> physicalFilters =
-                sharedObjects.add(new ArrayList<>());
         TableDescriptor descriptor =
                 TableFactoryHarness.newBuilder()
                         .schema(MixedFilterSource.SCHEMA)
-                        .source(new MixedFilterSource(metadataFilters, 
physicalFilters))
+                        .source(new MixedFilterSource())
                         .build();
         util.tableEnv().createTable("T4", descriptor);
 
+        // id > 10 → physical path, event_time > ... → metadata path.
         util.verifyRelPlan(
                 "SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
-
-        // Verify routing: id > 10 → physical path, event_time > ... → 
metadata path.
-        
assertThat(physicalFilters.get().toString()).isEqualTo("[greaterThan(id, 10)]");
-        assertThat(metadataFilters.get().toString())
-                .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
     }
 
     @Test
     void testPartialMetadataFilterAcceptance() {
-        SharedReference<List<ResolvedExpression>> receivedFilters =
-                sharedObjects.add(new ArrayList<>());
         TableDescriptor descriptor =
                 TableFactoryHarness.newBuilder()
                         .schema(PartialMetadataFilterSource.SCHEMA)
-                        .source(new 
PartialMetadataFilterSource(receivedFilters))
+                        .source(new PartialMetadataFilterSource())
                         .build();
         util.tableEnv().createTable("T6", descriptor);
 
-        // Two metadata filters: the source accepts only the first one
+        // Source accepts the first filter and rejects the second.
         util.verifyRelPlan(
                 "SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01 
00:00:00'"
                         + " AND priority > 5");
-
-        // Source receives both filters; the XML reference verifies only the 
first is accepted
-        // (the second remains as a LogicalFilter above the scan).
-        assertThat(receivedFilters.get().toString())
-                .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00), 
greaterThan(priority, 5)]");
     }
 
     @Test
     void testPhysicalAndMetadataNameCollision() {
-        // Physical column 'offset' shares a name with the metadata key 
'offset'
-        // (aliased in SQL as 'msg_offset'). The predicate on the metadata 
column
-        // must be pushed down using the metadata key, not confused with the
-        // physical column of the same name.
-        SharedReference<List<ResolvedExpression>> receivedFilters =
-                sharedObjects.add(new ArrayList<>());
+        // Physical column 'offset' shares a name with metadata key 'offset' 
(aliased to
+        // 'msg_offset'). The predicate must push down using the metadata key, 
not the alias.
         TableDescriptor descriptor =
                 TableFactoryHarness.newBuilder()
                         .schema(CollidingNameSource.SCHEMA)
-                        .source(new CollidingNameSource(receivedFilters))
+                        .source(new CollidingNameSource())
                         .build();
         util.tableEnv().createTable("T7", descriptor);
 
         util.verifyRelPlan("SELECT id FROM T7 WHERE msg_offset > 5");
+    }
+
+    @Test
+    void testBestEffortMetadataPruning() {
+        // Source puts every predicate in both accepted and remaining; plan 
shows both paths.
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(MetadataFilterSource.SCHEMA)
+                        .source(new BestEffortPruningSource())
+                        .build();
+        util.tableEnv().createTable("T8", descriptor);
+
+        util.verifyRelPlan("SELECT id FROM T8 WHERE event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
+    }
+
+    @Test
+    void testNonContiguousSubsetAcceptance() {
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(NonContiguousAcceptingSource.SCHEMA)
+                        .source(new NonContiguousAcceptingSource())
+                        .build();
+        util.tableEnv().createTable("T9", descriptor);
+
+        util.verifyRelPlan("SELECT id FROM T9 WHERE m0 > 0 AND m1 > 1 AND m2 > 
2");
+    }
 
-        // Must reference the metadata key 'offset', NOT the SQL alias 
'msg_offset'.
-        
assertThat(receivedFilters.get().toString()).isEqualTo("[greaterThan(offset, 
5)]");
+    @Test
+    void testCoverageInvariantWhenSourceDropsPredicate() {
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(MetadataFilterSource.SCHEMA)
+                        .source(new DroppingAllSource())
+                        .build();
+        util.tableEnv().createTable("TDrop", descriptor);
+
+        assertThatThrownBy(
+                        () ->
+                                util.tableEnv()
+                                        .explainSql(
+                                                "SELECT id FROM TDrop "
+                                                        + "WHERE event_time > 
TIMESTAMP '2024-01-01 00:00:00'"))
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        "Source dropped a metadata filter that was passed to "
+                                + "applyMetadataFilters. Every input predicate 
must appear in the "
+                                + "result's accepted list, remaining list, or 
both.");
     }
 
     @Test
     void testMetadataFilterWithProjection() {
-        SharedReference<List<ResolvedExpression>> receivedFilters =
-                sharedObjects.add(new ArrayList<>());
         TableDescriptor descriptor =
                 TableFactoryHarness.newBuilder()
                         .schema(MetadataFilterSource.SCHEMA)
-                        .source(new MetadataFilterSource(true, 
receivedFilters))
+                        .source(new MetadataFilterSource(true))
                         .build();
         util.tableEnv().createTable("T5", descriptor);
 
+        // Projection push-down must not perturb the metadata filter.
         util.verifyRelPlan(
                 "SELECT id, name FROM T5 WHERE event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
-
-        // Projection push-down must not perturb the metadata filter.
-        assertThat(receivedFilters.get().toString())
-                .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
     }
 
     // 
-----------------------------------------------------------------------------------------
@@ -239,13 +240,9 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
                         .build();
 
         private final boolean supportsMetadataFilter;
-        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
 
-        MetadataFilterSource(
-                boolean supportsMetadataFilter,
-                SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters) {
+        MetadataFilterSource(boolean supportsMetadataFilter) {
             this.supportsMetadataFilter = supportsMetadataFilter;
-            this.receivedMetadataFilters = receivedMetadataFilters;
         }
 
         @Override
@@ -265,11 +262,36 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
 
         @Override
         public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
-            receivedMetadataFilters.get().addAll(metadataFilters);
             return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
         }
     }
 
+    /** Returns each input in both accepted and remaining (best-effort pruning 
shape). */
+    private static class BestEffortPruningSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata {
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new HashMap<>();
+            metadata.put("event_time", TIMESTAMP(3));
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
+
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return true;
+        }
+
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            // Best-effort: accepted = remaining = all.
+            return MetadataFilterResult.of(metadataFilters, metadataFilters);
+        }
+    }
+
     /** Tests key translation when SQL alias differs from metadata key. */
     private static class RenamedMetadataFilterSource extends 
TableFactoryHarness.ScanSourceBase
             implements SupportsReadingMetadata {
@@ -280,13 +302,6 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
                         .columnByMetadata("event_ts", TIMESTAMP(3), 
"timestamp")
                         .build();
 
-        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
-
-        RenamedMetadataFilterSource(
-                SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters) {
-            this.receivedMetadataFilters = receivedMetadataFilters;
-        }
-
         @Override
         public Map<String, DataType> listReadableMetadata() {
             Map<String, DataType> metadata = new HashMap<>();
@@ -304,7 +319,6 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
 
         @Override
         public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
-            receivedMetadataFilters.get().addAll(metadataFilters);
             return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
         }
     }
@@ -320,16 +334,9 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
                         .columnByMetadata("priority", INT())
                         .build();
 
-        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
-
-        PartialMetadataFilterSource(
-                SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters) {
-            this.receivedMetadataFilters = receivedMetadataFilters;
-        }
-
         @Override
         public Map<String, DataType> listReadableMetadata() {
-            Map<String, DataType> metadata = new HashMap<>();
+            Map<String, DataType> metadata = new LinkedHashMap<>();
             metadata.put("event_time", TIMESTAMP(3));
             metadata.put("priority", INT());
             return metadata;
@@ -345,8 +352,6 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
 
         @Override
         public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
-            receivedMetadataFilters.get().addAll(metadataFilters);
-            // Accept only the first filter
             List<ResolvedExpression> accepted =
                     metadataFilters.isEmpty()
                             ? Collections.emptyList()
@@ -370,16 +375,6 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
                         .columnByMetadata("event_time", TIMESTAMP(3))
                         .build();
 
-        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
-        private final SharedReference<List<ResolvedExpression>> 
receivedPhysicalFilters;
-
-        MixedFilterSource(
-                SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters,
-                SharedReference<List<ResolvedExpression>> 
receivedPhysicalFilters) {
-            this.receivedMetadataFilters = receivedMetadataFilters;
-            this.receivedPhysicalFilters = receivedPhysicalFilters;
-        }
-
         @Override
         public Map<String, DataType> listReadableMetadata() {
             Map<String, DataType> metadata = new HashMap<>();
@@ -397,13 +392,11 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
 
         @Override
         public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
-            receivedMetadataFilters.get().addAll(metadataFilters);
             return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
         }
 
         @Override
         public Result applyFilters(List<ResolvedExpression> filters) {
-            receivedPhysicalFilters.get().addAll(filters);
             return Result.of(filters, Collections.emptyList());
         }
     }
@@ -422,16 +415,80 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
                         .columnByMetadata("msg_offset", INT(), "offset")
                         .build();
 
-        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new HashMap<>();
+            metadata.put("offset", INT());
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
 
-        CollidingNameSource(SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters) {
-            this.receivedMetadataFilters = receivedMetadataFilters;
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return true;
         }
 
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
+        }
+    }
+
+    /** Accepts inputs at positions 0 and 2, rejects position 1. */
+    private static class NonContiguousAcceptingSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata {
+
+        public static final Schema SCHEMA =
+                Schema.newBuilder()
+                        .column("id", INT())
+                        .columnByMetadata("m0", INT())
+                        .columnByMetadata("m1", INT())
+                        .columnByMetadata("m2", INT())
+                        .build();
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new LinkedHashMap<>();
+            metadata.put("m0", INT());
+            metadata.put("m1", INT());
+            metadata.put("m2", INT());
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
+
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return true;
+        }
+
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            // Accept inputs at positions 0 and 2; reject position 1.
+            List<ResolvedExpression> accepted = new ArrayList<>();
+            List<ResolvedExpression> remaining = new ArrayList<>();
+            for (int i = 0; i < metadataFilters.size(); i++) {
+                if (i == 1) {
+                    remaining.add(metadataFilters.get(i));
+                } else {
+                    accepted.add(metadataFilters.get(i));
+                }
+            }
+            return MetadataFilterResult.of(accepted, remaining);
+        }
+    }
+
+    /** Drops every input by returning empty accepted and empty remaining. */
+    private static class DroppingAllSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata {
+
         @Override
         public Map<String, DataType> listReadableMetadata() {
             Map<String, DataType> metadata = new HashMap<>();
-            metadata.put("offset", INT());
+            metadata.put("event_time", TIMESTAMP(3));
             return metadata;
         }
 
@@ -445,8 +502,7 @@ class MetadataFilterInReadingMetadataTest extends 
TableTestBase {
 
         @Override
         public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
-            receivedMetadataFilters.get().addAll(metadataFilters);
-            return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
+            return MetadataFilterResult.of(Collections.emptyList(), 
Collections.emptyList());
         }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
new file mode 100644
index 00000000000..47a987d485c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.factories.TableFactoryHarness;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests covering the four shapes a connector can return from 
{@link
+ * SupportsReadingMetadata#applyMetadataFilters} (accept-all, accept-none, 
partial accept,
+ * best-effort overlap).
+ *
+ * <p>For each shape this test asserts both the runtime result (the Calc above 
the scan must
+ * evaluate any {@code remaining} predicates) and the optimized plan (the scan 
carries the {@code
+ * accepted} set, the {@code LogicalFilter} above carries the {@code 
remaining} set).
+ */
+class MetadataFilterResultShapesITCase {
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(2)
+                            .build());
+
+    /** Schema: {@code id INT, m0 INT METADATA, m1 INT METADATA}. */
+    private static final Schema SCHEMA =
+            Schema.newBuilder()
+                    .column("id", INT())
+                    .columnByMetadata("m0", INT())
+                    .columnByMetadata("m1", INT())
+                    .build();
+
+    /**
+     * Six rows that exercise both predicates {@code m0 > 0} and {@code m1 > 
0}. The data covers all
+     * four cells of the Cartesian product (both pass / only m0 passes / only 
m1 passes / neither).
+     */
+    private static final List<Row> ROWS =
+            Arrays.asList(
+                    Row.of(1, 5, 5), // both pass
+                    Row.of(2, 5, -1), // only m0 passes
+                    Row.of(3, -1, 5), // only m1 passes
+                    Row.of(4, -1, -1), // neither passes
+                    Row.of(5, 7, 9), // both pass
+                    Row.of(6, 0, 0)); // neither passes (predicates are strict 
>)
+
+    private static final String SQL = "SELECT id FROM %s WHERE m0 > 0 AND m1 > 
0 ORDER BY id";
+
+    /** Rows that satisfy {@code m0 > 0 AND m1 > 0}: only id=1 and id=5. */
+    private static final List<Integer> EXPECTED_FILTERED_IDS = 
Arrays.asList(1, 5);
+
+    private TableEnvironment tableEnv;
+
+    @BeforeEach
+    void setup() {
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+        tableEnv = TableEnvironment.create(settings);
+    }
+
+    /** Row passes both predicates {@code m0 > 0 AND m1 > 0}. */
+    private static final Predicate<Row> PASS_BOTH =
+            row -> ((Integer) row.getField(1)) > 0 && ((Integer) 
row.getField(2)) > 0;
+
+    /** Row passes only the first predicate {@code m0 > 0}. */
+    private static final Predicate<Row> PASS_M0 = row -> ((Integer) 
row.getField(1)) > 0;
+
+    /** Row passes no predicate (source emits everything). */
+    private static final Predicate<Row> PASS_NONE = row -> true;
+
+    // Plan-string assertions below use two renderers: `metadataFilter=[...]` 
(lowercase `and`,
+    // from RexNode toString) and `where=[...]` (uppercase `AND`, from 
RelExplainable).
+
+    @Test
+    void testAcceptAll() throws Exception {
+        // Source claims it can apply both predicates and does so by emitting 
only matching rows.
+        registerTable("T_ACCEPT_ALL", ConfigurableMetadataSource.ACCEPT_ALL, 
PASS_BOTH);
+
+        assertThat(collectIds(String.format(SQL, "T_ACCEPT_ALL")))
+                .containsExactlyElementsOf(EXPECTED_FILTERED_IDS);
+
+        String explain = tableEnv.explainSql(String.format(SQL, 
"T_ACCEPT_ALL"));
+        // Both predicates pushed onto the scan as the conjunction.
+        assertThat(explain).contains("metadataFilter=[and(>(m0, 0), >(m1, 
0))]");
+        // No runtime Calc with a where on metadata: accepted = inputs, 
remaining = empty.
+        assertThat(explain).doesNotContain("where=[((m0 > 0) AND (m1 > 0))]");
+        assertThat(explain).doesNotContain("where=[(m0 > 0)]");
+        assertThat(explain).doesNotContain("where=[(m1 > 0)]");
+    }
+
+    @Test
+    void testAcceptNone() throws Exception {
+        // Source rejects everything: emit all rows; runtime Calc must do all 
the filtering.
+        registerTable("T_ACCEPT_NONE", ConfigurableMetadataSource.ACCEPT_NONE, 
PASS_NONE);
+
+        assertThat(collectIds(String.format(SQL, "T_ACCEPT_NONE")))
+                .containsExactlyElementsOf(EXPECTED_FILTERED_IDS);
+
+        String explain = tableEnv.explainSql(String.format(SQL, 
"T_ACCEPT_NONE"));
+        // Empty accepted set; full conjunction retained on the runtime Calc.
+        assertThat(explain).contains("metadataFilter=[]");
+        assertThat(explain).contains("where=[AND(>(m0, 0), >(m1, 0))]");
+        // Nothing pushed besides the empty marker.
+        assertThat(explain).doesNotContain("metadataFilter=[>(");
+        assertThat(explain).doesNotContain("metadataFilter=[and(");
+    }
+
+    @Test
+    void testAcceptFirstOnly() throws Exception {
+        // Source applies only m0 > 0 itself; runtime Calc must apply m1 > 0.
+        registerTable("T_ACCEPT_FIRST", 
ConfigurableMetadataSource.ACCEPT_FIRST_ONLY, PASS_M0);
+
+        assertThat(collectIds(String.format(SQL, "T_ACCEPT_FIRST")))
+                .containsExactlyElementsOf(EXPECTED_FILTERED_IDS);
+
+        String explain = tableEnv.explainSql(String.format(SQL, 
"T_ACCEPT_FIRST"));
+        // First predicate (m0 > 0) is pushed; second (m1 > 0) stays on the 
runtime Calc.
+        assertThat(explain).contains("metadataFilter=[>(m0, 0)]");
+        assertThat(explain).contains("where=[>(m1, 0)]");
+        assertThat(explain).doesNotContain("metadataFilter=[>(m1, 0)]");
+        assertThat(explain).doesNotContain("metadataFilter=[and(");
+    }
+
+    @Test
+    void testBestEffortOverlap() throws Exception {
+        // Source claims both for storage-side pruning AND remains them: 
runtime must re-apply.
+        // Source emits all rows so the runtime Calc is the load-bearing 
filter; correctness must
+        // not depend on the source's claim.
+        registerTable(
+                "T_BEST_EFFORT", 
ConfigurableMetadataSource.BEST_EFFORT_BOTH_AND_REMAIN, PASS_NONE);
+
+        assertThat(collectIds(String.format(SQL, "T_BEST_EFFORT")))
+                .containsExactlyElementsOf(EXPECTED_FILTERED_IDS);
+
+        String explain = tableEnv.explainSql(String.format(SQL, 
"T_BEST_EFFORT"));
+        // Both predicates accepted (for storage-side pruning) AND both also 
retained as a runtime
+        // Calc so the source's pruning need only be best-effort.
+        assertThat(explain).contains("metadataFilter=[and(>(m0, 0), >(m1, 
0))]");
+        assertThat(explain).contains("where=[AND(>(m0, 0), >(m1, 0))]");
+    }
+
+    // 
-----------------------------------------------------------------------------------------
+    // Helpers
+    // 
-----------------------------------------------------------------------------------------
+
+    private void registerTable(
+            String name,
+            Function<List<ResolvedExpression>, MetadataFilterResult> splitter,
+            Predicate<Row> sourceSideFilter) {
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(SCHEMA)
+                        .source(new ConfigurableMetadataSource(splitter, 
sourceSideFilter))
+                        .build();
+        tableEnv.createTable(name, descriptor);
+    }
+
+    private List<Integer> collectIds(String sql) {
+        TableResult result = tableEnv.executeSql(sql);
+        try (CloseableIterator<Row> iterator = result.collect()) {
+            List<Row> rows = CollectionUtil.iteratorToList(iterator);
+            return rows.stream().map(r -> (Integer) 
r.getField(0)).collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // 
-----------------------------------------------------------------------------------------
+    // Test source
+    // 
-----------------------------------------------------------------------------------------
+
+    /**
+     * Bounded harness source. Supports {@link SupportsReadingMetadata} with 
two metadata columns
+     * ({@code m0}, {@code m1}). The {@code MetadataFilterResult} returned 
from {@link
+     * #applyMetadataFilters(List)} is controlled by the supplied splitter 
strategy so each test can
+     * exercise a different shape.
+     *
+     * <p>The runtime emits all rows; correctness for {@code remaining} 
predicates depends on the
+     * Calc above the scan to drop non-matching rows.
+     */
+    static class ConfigurableMetadataSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata {
+
+        static final Function<List<ResolvedExpression>, MetadataFilterResult> 
ACCEPT_ALL =
+                in -> MetadataFilterResult.of(in, Collections.emptyList());
+
+        static final Function<List<ResolvedExpression>, MetadataFilterResult> 
ACCEPT_NONE =
+                in -> MetadataFilterResult.of(Collections.emptyList(), in);
+
+        static final Function<List<ResolvedExpression>, MetadataFilterResult> 
ACCEPT_FIRST_ONLY =
+                in ->
+                        MetadataFilterResult.of(
+                                Collections.singletonList(in.get(0)),
+                                Collections.singletonList(in.get(1)));
+
+        static final Function<List<ResolvedExpression>, MetadataFilterResult>
+                BEST_EFFORT_BOTH_AND_REMAIN = in -> 
MetadataFilterResult.of(in, in);
+
+        private final Function<List<ResolvedExpression>, MetadataFilterResult> 
splitter;
+        private final Predicate<Row> sourceSideFilter;
+
+        private DataType producedDataType;
+
+        ConfigurableMetadataSource(
+                Function<List<ResolvedExpression>, MetadataFilterResult> 
splitter,
+                Predicate<Row> sourceSideFilter) {
+            super(true);
+            this.splitter = splitter;
+            this.sourceSideFilter = sourceSideFilter;
+        }
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new LinkedHashMap<>();
+            metadata.put("m0", INT());
+            metadata.put("m1", INT());
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+            this.producedDataType = producedDataType;
+        }
+
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return true;
+        }
+
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            return splitter.apply(metadataFilters);
+        }
+
+        @Override
+        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
+            DataType emitted =
+                    producedDataType != null
+                            ? producedDataType
+                            : getFactoryContext().getPhysicalRowDataType();
+            DynamicTableSource.DataStructureConverter converter =
+                    
runtimeProviderContext.createDataStructureConverter(emitted);
+            // Apply the source-side filter once; the generator just hands out 
rows by index.
+            List<Row> emittedRows =
+                    
ROWS.stream().filter(sourceSideFilter).collect(Collectors.toList());
+            GeneratorFunction<Long, RowData> generator =
+                    index -> (RowData) 
converter.toInternal(emittedRows.get(index.intValue()));
+            DataGeneratorSource<RowData> source =
+                    new DataGeneratorSource<>(
+                            generator, emittedRows.size(), 
TypeInformation.of(RowData.class));
+            return SourceProvider.of(source);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
index cef00b21d69..899a8640cf9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
@@ -33,6 +33,27 @@ LogicalProject(id=[$0])
 LogicalProject(id=[$0])
 +- LogicalProject(id=[$0], event_ts=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, T3, 
metadata=[timestamp], metadataFilter=[>(timestamp, 2024-01-01 00:00:00)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testBestEffortMetadataPruning">
+    <Resource name="sql">
+      <![CDATA[SELECT id FROM T8 WHERE event_time > TIMESTAMP '2024-01-01 
00:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+   +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T8, 
metadata=[event_time]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+   +- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T8, 
metadata=[event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -114,6 +135,27 @@ LogicalProject(id=[$0])
 LogicalProject(id=[$0])
 +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
    +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
metadata=[event_time], filter=[>(id, 10)], metadataFilter=[>(event_time, 
2024-01-01 00:00:00)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNonContiguousSubsetAcceptance">
+    <Resource name="sql">
+      <![CDATA[SELECT id FROM T9 WHERE m0 > 0 AND m1 > 1 AND m2 > 2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[AND(>($1, 0), >($2, 1), >($3, 2))])
+   +- LogicalProject(id=[$0], m0=[$1], m1=[$2], m2=[$3])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T9, 
metadata=[m0, m1, m2]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], m0=[$1], m1=[$2], m2=[$3])
+   +- LogicalFilter(condition=[>($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T9, 
metadata=[m0, m1, m2], metadataFilter=[and(>(m0, 0), >(m2, 2))]]])
 ]]>
     </Resource>
   </TestCase>
@@ -125,16 +167,16 @@ LogicalProject(id=[$0])
       <![CDATA[
 LogicalProject(id=[$0])
 +- LogicalFilter(condition=[AND(>($1, 2024-01-01 00:00:00), >($2, 5))])
-   +- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T6, 
metadata=[priority, event_time]]])
+   +- LogicalProject(id=[$0], event_time=[$1], priority=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T6, 
metadata=[event_time, priority]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
 LogicalProject(id=[$0])
-+- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
-   +- LogicalFilter(condition=[>($1, 5)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T6, 
metadata=[priority, event_time], metadataFilter=[>(event_time, 2024-01-01 
00:00:00)]]])
++- LogicalProject(id=[$0], event_time=[$1], priority=[$2])
+   +- LogicalFilter(condition=[>($2, 5)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T6, 
metadata=[event_time, priority], metadataFilter=[>(event_time, 2024-01-01 
00:00:00)]]])
 ]]>
     </Resource>
   </TestCase>

Reply via email to