Yun Gao created FLINK-23159:
-------------------------------

             Summary: Correlated sql subquery on the source created via 
fromValues() failed to compile
                 Key: FLINK-23159
                 URL: https://issues.apache.org/jira/browse/FLINK-23159
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.13.0
            Reporter: Yun Gao


Correlated subquery likeĀ 
{code:java}
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

public class SQLQueryTeszt {

  public static void main(String[] args) {
    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode()
      .build();

    TableEnvironment tableEnvironment = TableEnvironment.create(settings);

    DataType row = DataTypes.ROW(
      DataTypes.FIELD("flag", DataTypes.STRING()),
      DataTypes.FIELD("id", DataTypes.INT()),
      DataTypes.FIELD("name", DataTypes.STRING())
    );
    Table table = tableEnvironment.fromValues(row, new 
MyListSource("table1").builder());
    tableEnvironment.createTemporaryView("table1", table);

    table = tableEnvironment.fromValues(row, new 
MyListSource("table2").builder());
    tableEnvironment.createTemporaryView("table2", table);

    String sql = "select t1.flag from table1 t1 where t1.name in (select 
t2.name from table2 t2 where t2.id = t1.id)";

    tableEnvironment.explainSql(sql);
  }

  public static class MyListSource {

    private String flag;

    public MyListSource(String flag) {
      this.flag = flag;
    }

    public List<Row> builder() {
      List<Row> rows = new ArrayList<>();
      for (int i = 2; i < 3; i++) {
        Row row = new Row(3);
        row.setField(0, flag);
        row.setField(1, i);
        row.setField(2, "me");
        rows.add(row);
      }
      return rows;
    }
  }
}
{code}
would throws
{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: 
unexpected correlate variable $cor0 in the plan
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.immutable.Range.foreach(Range.scala:160)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
  at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
  at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:101)
  at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:46)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:691)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:677)
  at test.SQLQueryTeszt.main(SQLQueryTeszt.java:57)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to