snuyanzin commented on code in PR #28162:
URL: https://github.com/apache/flink/pull/28162#discussion_r3304447885
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java:
##########
@@ -194,18 +230,77 @@ protected Map<String, String>
buildColumnToMetadataKeyMap(TableSourceTable table
RexNode[] remappedPredicates =
remapPredicates(metadataPredicates,
metadataRowInfo.oldIndexToNewIndex);
- MetadataFilterResult result =
- MetadataFilterPushDownSpec.applyMetadataFilters(
+ MetadataFilterApplyResult applyResult =
+ MetadataFilterPushDownSpec.applyMetadataFiltersWithLookup(
Arrays.asList(remappedPredicates),
metadataRowInfo.metadataRowType,
newTableSource,
abilityContext);
- int acceptedCount = result.getAcceptedFilters().size();
+ // Build a parallel identity-keyed map from the resolved expression
handed to the source
+ // back to the *original* input-space RexNode (one of
metadataPredicates). The spec needs
+ // remapped (metadata-key-space) RexNodes; the runtime Calc above the
scan needs the
+ // original input-space RexNodes (which reference the scan's full row
type by index).
+ // resolved/remapped/original are 1:1 by construction
(resolvePredicates checks size).
+ Map<RexNode, RexNode> remappedToOriginal = new IdentityHashMap<>();
+ for (int i = 0; i < remappedPredicates.length; i++) {
+ remappedToOriginal.put(remappedPredicates[i],
metadataPredicates[i]);
+ }
+ Map<ResolvedExpression, RexNode> resolvedToRemapped =
applyResult.resolvedToInputRexNode;
+
+ // Spec storage: walk the source's accepted list. For each accepted
ResolvedExpression,
+ // recover the remapped RexNode by identity and store it in the spec.
The source must
+ // return back the same ResolvedExpression instances it received;
rebuilt or substituted
+ // instances would miss the lookup and raise a TableException.
+ IdentityHashMap<RexNode, Boolean> covered = new IdentityHashMap<>();
List<RexNode> acceptedRemappedPredicates = new ArrayList<>();
- for (int i = 0; i < acceptedCount; i++) {
- acceptedRemappedPredicates.add(remappedPredicates[i]);
+ for (ResolvedExpression accepted :
applyResult.result.getAcceptedFilters()) {
+ RexNode remapped = resolvedToRemapped.get(accepted);
+ if (remapped == null) {
+ throw new TableException(
+ "Source returned an accepted metadata filter not in
the input list. "
+ + "Sources must return back the same
ResolvedExpression instances "
+ + "they received from applyMetadataFilters.");
+ }
+ acceptedRemappedPredicates.add(remapped);
+ covered.put(remapped, Boolean.TRUE);
+ }
+
+ // Runtime Calc: walk the source's remaining list. For each remaining
ResolvedExpression,
+ // recover the *original* input-space RexNode by identity.
+ List<RexNode> remainingInputRexNodes = new ArrayList<>();
+ for (ResolvedExpression remaining :
applyResult.result.getRemainingFilters()) {
+ RexNode remapped = resolvedToRemapped.get(remaining);
+ if (remapped == null) {
+ throw new TableException(
+ "Source returned a remaining metadata filter not in
the input list. "
+ + "Sources must return back the same
ResolvedExpression instances "
+ + "they received from applyMetadataFilters.");
+ }
+ RexNode original = remappedToOriginal.get(remapped);
+ // remapped is a value from resolvedToRemapped, which we built
from the same
+ // remappedPredicates array used as keys for remappedToOriginal —
so the lookup
+ // must succeed by construction.
+ Preconditions.checkState(
Review Comment:
can we be more succinct on comments and have them in way that we don't need
to change them just because some vars are renamed?
May be better to keep more highlevel info in comments, low level could be
read from the code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]