fsk119 commented on code in PR #18920: URL: https://github.com/apache/flink/pull/18920#discussion_r906783415
########## flink-connectors/flink-connector-hive/src/test/resources/query-test/sub_query.q: ########## @@ -0,0 +1,9 @@ +-- SORT_QUERY_RESULTS Review Comment: After reading the doc[1] in the hive, could you add two more case in the text. ``` select * from (select x.key from src x); ``` [1] https://cwiki.apache.org/confluence/display/hive/languagemanual+subqueries ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java: ########## @@ -1070,7 +1073,11 @@ private RelNode genFilterRelNode( .convert(subQueryExpr) .accept(funcConverter); - RelNode filterRel = LogicalFilter.create(srcRel, convertedFilterLHS); + RelNode filterRel = Review Comment: Please check whether this is enough for the fix. ``` RelNode filterRel = LogicalFilter.create(srcRel, convertedFilterLHS); if (!RelOptUtil.getVariablesUsed(filterRel).isEmpty()) { filterRel = LogicalFilter.create( srcRel, convertedFilterLHS, ImmutableSet.copyOf(RelOptUtil.getVariablesSet(filterRel))); } ``` ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java: ########## @@ -1862,6 +1867,65 @@ public static RelNode genValues( rows); } + // traverse the given node to find all correlated variables + public static Set<CorrelationId> getVariablesSet(RexNode rexNode) { Review Comment: After reading the `RelOptUtil#getVariablesSet` and `RelOptUtil#getVariablesUsed`, I am fine with the name. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java: ########## @@ -1862,6 +1867,65 @@ public static RelNode genValues( rows); } + // traverse the given node to find all correlated variables + public static Set<CorrelationId> getVariablesSet(RexNode rexNode) { + Set<CorrelationId> correlationVariables = new HashSet<>(); + if (rexNode instanceof RexSubQuery) { + RexSubQuery rexSubQuery = (RexSubQuery) rexNode; + // we expect correlated variables in Filter only for now. + // also check case where operator has o inputs .e.g TableScan + if (rexSubQuery.rel.getInputs().isEmpty()) { + return correlationVariables; + } + RelNode input = rexSubQuery.rel.getInput(0); + while (input != null + && !(input instanceof LogicalFilter) + && input.getInputs().size() >= 1) { + // we don't expect corr vars within UNION for now + if (input.getInputs().size() > 1) { + if (input instanceof LogicalJoin) { Review Comment: Add a case like ``` select * from src x join src y on x.key = y.key where exists (select * from src z where z.value = x.value and z.value = y.value); ``` to verify the correctness. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java: ########## @@ -1862,6 +1867,65 @@ public static RelNode genValues( rows); } + // traverse the given node to find all correlated variables + public static Set<CorrelationId> getVariablesSet(RexNode rexNode) { + Set<CorrelationId> correlationVariables = new HashSet<>(); + if (rexNode instanceof RexSubQuery) { + RexSubQuery rexSubQuery = (RexSubQuery) rexNode; + // we expect correlated variables in Filter only for now. + // also check case where operator has o inputs .e.g TableScan + if (rexSubQuery.rel.getInputs().isEmpty()) { + return correlationVariables; + } + RelNode input = rexSubQuery.rel.getInput(0); + while (input != null + && !(input instanceof LogicalFilter) + && input.getInputs().size() >= 1) { + // we don't expect corr vars within UNION for now + if (input.getInputs().size() > 1) { + if (input instanceof LogicalJoin) { + correlationVariables.addAll( + findCorrelatedVar(((LogicalJoin) input).getCondition())); + } + return correlationVariables; Review Comment: I think we'd better throw an exception to notify the users it's an unsupported feature for now if the node is not the `LogicalJoin` here. It's difficult for others to track why the planner failed to decorrelate the subquery. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
