Yun Gao created FLINK-23159: ------------------------------- Summary: Correlated sql subquery on the source created via fromValues() failed to compile Key: FLINK-23159 URL: https://issues.apache.org/jira/browse/FLINK-23159 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.13.0 Reporter: Yun Gao
Correlated subquery likeĀ {code:java} import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import java.util.ArrayList; import java.util.List; public class SQLQueryTeszt { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode() .build(); TableEnvironment tableEnvironment = TableEnvironment.create(settings); DataType row = DataTypes.ROW( DataTypes.FIELD("flag", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name", DataTypes.STRING()) ); Table table = tableEnvironment.fromValues(row, new MyListSource("table1").builder()); tableEnvironment.createTemporaryView("table1", table); table = tableEnvironment.fromValues(row, new MyListSource("table2").builder()); tableEnvironment.createTemporaryView("table2", table); String sql = "select t1.flag from table1 t1 where t1.name in (select t2.name from table2 t2 where t2.id = t1.id)"; tableEnvironment.explainSql(sql); } public static class MyListSource { private String flag; public MyListSource(String flag) { this.flag = flag; } public List<Row> builder() { List<Row> rows = new ArrayList<>(); for (int i = 2; i < 3; i++) { Row row = new Row(3); row.setField(0, flag); row.setField(1, i); row.setField(2, "me"); rows.add(row); } return rows; } } } {code} would throws {code:java} Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor0 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:101) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:46) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:691) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:677) at test.SQLQueryTeszt.main(SQLQueryTeszt.java:57) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)