jnh5y commented on code in PR #28162:
URL: https://github.com/apache/flink/pull/28162#discussion_r3307136714
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java:
##########
@@ -194,18 +230,77 @@ protected Map<String, String>
buildColumnToMetadataKeyMap(TableSourceTable table
RexNode[] remappedPredicates =
remapPredicates(metadataPredicates,
metadataRowInfo.oldIndexToNewIndex);
- MetadataFilterResult result =
- MetadataFilterPushDownSpec.applyMetadataFilters(
+ MetadataFilterApplyResult applyResult =
+ MetadataFilterPushDownSpec.applyMetadataFiltersWithLookup(
Arrays.asList(remappedPredicates),
metadataRowInfo.metadataRowType,
newTableSource,
abilityContext);
- int acceptedCount = result.getAcceptedFilters().size();
+ // Build a parallel identity-keyed map from the resolved expression
handed to the source
+ // back to the *original* input-space RexNode (one of
metadataPredicates). The spec needs
+ // remapped (metadata-key-space) RexNodes; the runtime Calc above the
scan needs the
+ // original input-space RexNodes (which reference the scan's full row
type by index).
+ // resolved/remapped/original are 1:1 by construction
(resolvePredicates checks size).
+ Map<RexNode, RexNode> remappedToOriginal = new IdentityHashMap<>();
+ for (int i = 0; i < remappedPredicates.length; i++) {
+ remappedToOriginal.put(remappedPredicates[i],
metadataPredicates[i]);
+ }
+ Map<ResolvedExpression, RexNode> resolvedToRemapped =
applyResult.resolvedToInputRexNode;
+
+ // Spec storage: walk the source's accepted list. For each accepted
ResolvedExpression,
+ // recover the remapped RexNode by identity and store it in the spec.
The source must
+ // return back the same ResolvedExpression instances it received;
rebuilt or substituted
+ // instances would miss the lookup and raise a TableException.
+ IdentityHashMap<RexNode, Boolean> covered = new IdentityHashMap<>();
List<RexNode> acceptedRemappedPredicates = new ArrayList<>();
- for (int i = 0; i < acceptedCount; i++) {
- acceptedRemappedPredicates.add(remappedPredicates[i]);
+ for (ResolvedExpression accepted :
applyResult.result.getAcceptedFilters()) {
+ RexNode remapped = resolvedToRemapped.get(accepted);
+ if (remapped == null) {
+ throw new TableException(
+ "Source returned an accepted metadata filter not in
the input list. "
+ + "Sources must return back the same
ResolvedExpression instances "
+ + "they received from applyMetadataFilters.");
+ }
+ acceptedRemappedPredicates.add(remapped);
+ covered.put(remapped, Boolean.TRUE);
+ }
+
+ // Runtime Calc: walk the source's remaining list. For each remaining
ResolvedExpression,
+ // recover the *original* input-space RexNode by identity.
+ List<RexNode> remainingInputRexNodes = new ArrayList<>();
+ for (ResolvedExpression remaining :
applyResult.result.getRemainingFilters()) {
+ RexNode remapped = resolvedToRemapped.get(remaining);
+ if (remapped == null) {
+ throw new TableException(
+ "Source returned a remaining metadata filter not in
the input list. "
+ + "Sources must return back the same
ResolvedExpression instances "
+ + "they received from applyMetadataFilters.");
+ }
+ RexNode original = remappedToOriginal.get(remapped);
+ // remapped is a value from resolvedToRemapped, which we built
from the same
+ // remappedPredicates array used as keys for remappedToOriginal —
so the lookup
+ // must succeed by construction.
+ Preconditions.checkState(
Review Comment:
Simplifying the comments. (And tell my Claude to be terse. Again.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]