[
https://issues.apache.org/jira/browse/FLINK-23159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373308#comment-17373308
]
Wenlong Lyu commented on FLINK-23159:
-------------------------------------
[~gaoyunhaii] Thanks for reporting the issue, the root cause of this issue is
that currently Values is excluded in SubqueryDecorrelator, I would try to fix
it.
> Correlated sql subquery on the source created via fromValues() failed to
> compile
> --------------------------------------------------------------------------------
>
> Key: FLINK-23159
> URL: https://issues.apache.org/jira/browse/FLINK-23159
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.13.0
> Reporter: Yun Gao
> Priority: Major
>
> Correlated subquery likeĀ
> {code:java}
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.types.DataType;
> import org.apache.flink.types.Row;
> import java.util.ArrayList;
> import java.util.List;
> public class SQLQueryTest {
> public static void main(String[] args) {
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode()
> .build();
> TableEnvironment tableEnvironment = TableEnvironment.create(settings);
> DataType row = DataTypes.ROW(
> DataTypes.FIELD("flag", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("name", DataTypes.STRING())
> );
> Table table = tableEnvironment.fromValues(row, new
> MyListSource("table1").builder());
> tableEnvironment.createTemporaryView("table1", table);
> table = tableEnvironment.fromValues(row, new
> MyListSource("table2").builder());
> tableEnvironment.createTemporaryView("table2", table);
> String sql = "select t1.flag from table1 t1 where t1.name in (select
> t2.name from table2 t2 where t2.id = t1.id)";
> tableEnvironment.explainSql(sql);
> }
> public static class MyListSource {
> private String flag;
> public MyListSource(String flag) {
> this.flag = flag;
> }
> public List<Row> builder() {
> List<Row> rows = new ArrayList<>();
> for (int i = 2; i < 3; i++) {
> Row row = new Row(3);
> row.setField(0, flag);
> row.setField(1, i);
> row.setField(2, "me");
> rows.add(row);
> }
> return rows;
> }
> }
> }
> {code}
> would throws
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor0 in the plan
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:101)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:46)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:691)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:677)
> at test.SQLQueryTeszt.main(SQLQueryTeszt.java:57)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)