[ 
https://issues.apache.org/jira/browse/FLINK-23353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hayden zhou updated FLINK-23353:
--------------------------------
    Description: 
{code:java}

public class Top2Test {
    public static void main(String[] args) {

        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode()build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        Table sourceTable = tEnv.fromValues(
                DataTypes.ROW(
                    DataTypes.FIELD("id", DataTypes.INT()),
                    DataTypes.FIELD("name",DataTypes.STRING()),
                    DataTypes.FIELD("price", DataTypes.INT())
                ),
                row(1, "hayden", 18),
                row(3, "hayden", 19),
                row(4, "hayden", 20),
                row(2, "jaylin", 20)
        );

        tEnv.createTemporaryView("source", sourceTable);

        Table rT = tEnv.from("source")
                .groupBy($("name"))
                .flatAggregate(call(Top2.class, $("price")).as("price", "rank"))
                .select($("name"), $("price"), $("rank"));
        rT.execute().print();
    }


    public static class Top2Accumulator {
        public Integer first;
        public Integer second;
    }

    public static class Top2 extends TableAggregateFunction<Tuple2<Integer, 
Integer>, Top2Accumulator> {

        @Override
        public Top2Accumulator createAccumulator() {
            Top2Accumulator acc = new Top2Accumulator();
            acc.first = Integer.MIN_VALUE;
            acc.second = Integer.MIN_VALUE;
            return acc;
        }

        public void accumulate(Top2Accumulator acc, Integer value) {
            if (value > acc.first) {
                acc.second = acc.first;
                acc.first = value;
            } else if (value > acc.second) {
                acc.second = value;
            }
        }

        public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
            for (Top2Accumulator otherAcc : it) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }

        public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, 
Integer>> out) {
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }
    }

}

{code}

got errors as below:

{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
 fields=[name, price, rank])
+- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, 
_UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')])
   +- LogicalTableAggregate(group=[{1}], 
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
      +- LogicalUnion(all=[true])
         :- LogicalProject(id=[CAST(1):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(18):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         :- LogicalProject(id=[CAST(3):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(19):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         :- LogicalProject(id=[CAST(4):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         +- LogicalProject(id=[CAST(2):INTEGER], 
name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
            +- LogicalValues(tuples=[[{ 0 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
        at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
        at flinktest.Top2Test.main(Top2Test.java:37)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: convention=LOGICAL, 
FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is LogicalTableAggregate[convention: NONE -> LOGICAL]
There is 1 empty subset: rel#436:RelSubset#6.LOGICAL.any.[], the relevant part 
of the original plan is as follows
409:LogicalTableAggregate(group=[{1}], 
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
  407:LogicalUnion(subset=[rel#408:RelSubset#5.NONE.any.[]], all=[true])
    399:LogicalProject(subset=[rel#400:RelSubset#1.NONE.any.[]], 
id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(18):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    401:LogicalProject(subset=[rel#402:RelSubset#2.NONE.any.[]], 
id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(19):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    403:LogicalProject(subset=[rel#404:RelSubset#3.NONE.any.[]], 
id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    405:LogicalProject(subset=[rel#406:RelSubset#4.NONE.any.[]], 
id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])

{code}


if delete inBatchMode() method in 

{code:java}
        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
{code}
then it will running normaly


  was:

{code:java}

public class Top2Test {
    public static void main(String[] args) {

        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode()build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        Table sourceTable = tEnv.fromValues(
                DataTypes.ROW(
                    DataTypes.FIELD("id", DataTypes.INT()),
                    DataTypes.FIELD("name",DataTypes.STRING()),
                    DataTypes.FIELD("price", DataTypes.INT())
                ),
                row(1, "hayden", 18),
                row(3, "hayden", 19),
                row(4, "hayden", 20),
                row(2, "jaylin", 20)
        );

        tEnv.createTemporaryView("source", sourceTable);

        Table rT = tEnv.from("source")
                .groupBy($("name"))
                .flatAggregate(call(Top2.class, $("price")).as("price", "rank"))
                .select($("name"), $("price"), $("rank"));
        rT.execute().print();
    }


    public static class Top2Accumulator {
        public Integer first;
        public Integer second;
    }

    public static class Top2 extends TableAggregateFunction<Tuple2<Integer, 
Integer>, Top2Accumulator> {

        @Override
        public Top2Accumulator createAccumulator() {
            Top2Accumulator acc = new Top2Accumulator();
            acc.first = Integer.MIN_VALUE;
            acc.second = Integer.MIN_VALUE;
            return acc;
        }

        public void accumulate(Top2Accumulator acc, Integer value) {
            if (value > acc.first) {
                acc.second = acc.first;
                acc.first = value;
            } else if (value > acc.second) {
                acc.second = value;
            }
        }

        public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
            for (Top2Accumulator otherAcc : it) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }

        public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, 
Integer>> out) {
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }
    }

}

{code}

got errors as below:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
 fields=[name, price, rank])
+- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, 
_UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')])
   +- LogicalTableAggregate(group=[{1}], 
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
      +- LogicalUnion(all=[true])
         :- LogicalProject(id=[CAST(1):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(18):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         :- LogicalProject(id=[CAST(3):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(19):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         :- LogicalProject(id=[CAST(4):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         +- LogicalProject(id=[CAST(2):INTEGER], 
name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
            +- LogicalValues(tuples=[[{ 0 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
        at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
        at flinktest.Top2Test.main(Top2Test.java:37)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: convention=LOGICAL, 
FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is LogicalTableAggregate[convention: NONE -> LOGICAL]
There is 1 empty subset: rel#436:RelSubset#6.LOGICAL.any.[], the relevant part 
of the original plan is as follows
409:LogicalTableAggregate(group=[{1}], 
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
  407:LogicalUnion(subset=[rel#408:RelSubset#5.NONE.any.[]], all=[true])
    399:LogicalProject(subset=[rel#400:RelSubset#1.NONE.any.[]], 
id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(18):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    401:LogicalProject(subset=[rel#402:RelSubset#2.NONE.any.[]], 
id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(19):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    403:LogicalProject(subset=[rel#404:RelSubset#3.NONE.any.[]], 
id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    405:LogicalProject(subset=[rel#406:RelSubset#4.NONE.any.[]], 
id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])


if delete inBatchMode() method in 

{code:java}
        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
{code}
then it will running normaly



> UDTAGG can't execute in Batch mode
> ----------------------------------
>
>                 Key: FLINK-23353
>                 URL: https://issues.apache.org/jira/browse/FLINK-23353
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.1
>            Reporter: hayden zhou
>            Priority: Major
>
> {code:java}
> public class Top2Test {
>     public static void main(String[] args) {
>         EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inBatchMode()build();
>         TableEnvironment tEnv = TableEnvironment.create(settings);
>         Table sourceTable = tEnv.fromValues(
>                 DataTypes.ROW(
>                     DataTypes.FIELD("id", DataTypes.INT()),
>                     DataTypes.FIELD("name",DataTypes.STRING()),
>                     DataTypes.FIELD("price", DataTypes.INT())
>                 ),
>                 row(1, "hayden", 18),
>                 row(3, "hayden", 19),
>                 row(4, "hayden", 20),
>                 row(2, "jaylin", 20)
>         );
>         tEnv.createTemporaryView("source", sourceTable);
>         Table rT = tEnv.from("source")
>                 .groupBy($("name"))
>                 .flatAggregate(call(Top2.class, $("price")).as("price", 
> "rank"))
>                 .select($("name"), $("price"), $("rank"));
>         rT.execute().print();
>     }
>     public static class Top2Accumulator {
>         public Integer first;
>         public Integer second;
>     }
>     public static class Top2 extends TableAggregateFunction<Tuple2<Integer, 
> Integer>, Top2Accumulator> {
>         @Override
>         public Top2Accumulator createAccumulator() {
>             Top2Accumulator acc = new Top2Accumulator();
>             acc.first = Integer.MIN_VALUE;
>             acc.second = Integer.MIN_VALUE;
>             return acc;
>         }
>         public void accumulate(Top2Accumulator acc, Integer value) {
>             if (value > acc.first) {
>                 acc.second = acc.first;
>                 acc.first = value;
>             } else if (value > acc.second) {
>                 acc.second = value;
>             }
>         }
>         public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
>             for (Top2Accumulator otherAcc : it) {
>                 accumulate(acc, otherAcc.first);
>                 accumulate(acc, otherAcc.second);
>             }
>         }
>         public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, 
> Integer>> out) {
>             if (acc.first != Integer.MIN_VALUE) {
>                 out.collect(Tuple2.of(acc.first, 1));
>             }
>             if (acc.second != Integer.MIN_VALUE) {
>                 out.collect(Tuple2.of(acc.second, 2));
>             }
>         }
>     }
> }
> {code}
> got errors as below:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
> generate a valid execution plan for the given query: 
> LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
>  fields=[name, price, rank])
> +- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, 
> _UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')])
>    +- LogicalTableAggregate(group=[{1}], 
> tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
>       +- LogicalUnion(all=[true])
>          :- LogicalProject(id=[CAST(1):INTEGER], 
> name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(18):INTEGER])
>          :  +- LogicalValues(tuples=[[{ 0 }]])
>          :- LogicalProject(id=[CAST(3):INTEGER], 
> name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(19):INTEGER])
>          :  +- LogicalValues(tuples=[[{ 0 }]])
>          :- LogicalProject(id=[CAST(4):INTEGER], 
> name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(20):INTEGER])
>          :  +- LogicalValues(tuples=[[{ 0 }]])
>          +- LogicalProject(id=[CAST(2):INTEGER], 
> name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(20):INTEGER])
>             +- LogicalValues(tuples=[[{ 0 }]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
>       at scala.collection.immutable.List.foreach(List.scala:392)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
>       at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
>       at flinktest.Top2Test.main(Top2Test.java:37)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There 
> are not enough rules to produce a node with desired properties: 
> convention=LOGICAL, FlinkRelDistributionTraitDef=any, sort=[].
> Missing conversion is LogicalTableAggregate[convention: NONE -> LOGICAL]
> There is 1 empty subset: rel#436:RelSubset#6.LOGICAL.any.[], the relevant 
> part of the original plan is as follows
> 409:LogicalTableAggregate(group=[{1}], 
> tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
>   407:LogicalUnion(subset=[rel#408:RelSubset#5.NONE.any.[]], all=[true])
>     399:LogicalProject(subset=[rel#400:RelSubset#1.NONE.any.[]], 
> id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(18):INTEGER])
>       0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
> }]])
>     401:LogicalProject(subset=[rel#402:RelSubset#2.NONE.any.[]], 
> id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(19):INTEGER])
>       0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
> }]])
>     403:LogicalProject(subset=[rel#404:RelSubset#3.NONE.any.[]], 
> id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(20):INTEGER])
>       0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
> }]])
>     405:LogicalProject(subset=[rel#406:RelSubset#4.NONE.any.[]], 
> id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(20):INTEGER])
>       0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
> }]])
> {code}
> if delete inBatchMode() method in 
> {code:java}
>         EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inBatchMode().build();
> {code}
> then it will running normaly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to