jnh5y commented on code in PR #28162:
URL: https://github.com/apache/flink/pull/28162#discussion_r3307149703


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java:
##########
@@ -194,18 +230,77 @@ protected Map<String, String> 
buildColumnToMetadataKeyMap(TableSourceTable table
         RexNode[] remappedPredicates =
                 remapPredicates(metadataPredicates, 
metadataRowInfo.oldIndexToNewIndex);
 
-        MetadataFilterResult result =
-                MetadataFilterPushDownSpec.applyMetadataFilters(
+        MetadataFilterApplyResult applyResult =
+                MetadataFilterPushDownSpec.applyMetadataFiltersWithLookup(
                         Arrays.asList(remappedPredicates),
                         metadataRowInfo.metadataRowType,
                         newTableSource,
                         abilityContext);
 
-        int acceptedCount = result.getAcceptedFilters().size();
+        // Build a parallel identity-keyed map from the resolved expression 
handed to the source
+        // back to the *original* input-space RexNode (one of 
metadataPredicates). The spec needs
+        // remapped (metadata-key-space) RexNodes; the runtime Calc above the 
scan needs the
+        // original input-space RexNodes (which reference the scan's full row 
type by index).
+        // resolved/remapped/original are 1:1 by construction 
(resolvePredicates checks size).
+        Map<RexNode, RexNode> remappedToOriginal = new IdentityHashMap<>();
+        for (int i = 0; i < remappedPredicates.length; i++) {
+            remappedToOriginal.put(remappedPredicates[i], 
metadataPredicates[i]);
+        }
+        Map<ResolvedExpression, RexNode> resolvedToRemapped = 
applyResult.resolvedToInputRexNode;
+
+        // Spec storage: walk the source's accepted list. For each accepted 
ResolvedExpression,
+        // recover the remapped RexNode by identity and store it in the spec. 
The source must
+        // return back the same ResolvedExpression instances it received; 
rebuilt or substituted
+        // instances would miss the lookup and raise a TableException.
+        IdentityHashMap<RexNode, Boolean> covered = new IdentityHashMap<>();
         List<RexNode> acceptedRemappedPredicates = new ArrayList<>();
-        for (int i = 0; i < acceptedCount; i++) {
-            acceptedRemappedPredicates.add(remappedPredicates[i]);
+        for (ResolvedExpression accepted : 
applyResult.result.getAcceptedFilters()) {
+            RexNode remapped = resolvedToRemapped.get(accepted);
+            if (remapped == null) {
+                throw new TableException(
+                        "Source returned an accepted metadata filter not in 
the input list. "
+                                + "Sources must return back the same 
ResolvedExpression instances "
+                                + "they received from applyMetadataFilters.");
+            }
+            acceptedRemappedPredicates.add(remapped);
+            covered.put(remapped, Boolean.TRUE);
+        }
+
+        // Runtime Calc: walk the source's remaining list. For each remaining 
ResolvedExpression,
+        // recover the *original* input-space RexNode by identity.
+        List<RexNode> remainingInputRexNodes = new ArrayList<>();
+        for (ResolvedExpression remaining : 
applyResult.result.getRemainingFilters()) {
+            RexNode remapped = resolvedToRemapped.get(remaining);
+            if (remapped == null) {
+                throw new TableException(
+                        "Source returned a remaining metadata filter not in 
the input list. "
+                                + "Sources must return back the same 
ResolvedExpression instances "
+                                + "they received from applyMetadataFilters.");
+            }
+            RexNode original = remappedToOriginal.get(remapped);
+            // remapped is a value from resolvedToRemapped, which we built 
from the same
+            // remappedPredicates array used as keys for remappedToOriginal — 
so the lookup
+            // must succeed by construction.
+            Preconditions.checkState(
+                    original != null,
+                    "Internal invariant violated: remapped RexNode is not 
registered in "
+                            + "remappedToOriginal even though 
resolvedToRemapped returned it.");

Review Comment:
   Outdated now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to