[
https://issues.apache.org/jira/browse/FLINK-23603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ingo Bürk closed FLINK-23603.
-----------------------------
Resolution: Won't Fix
I'm closing this for now. Please let us know once you have translated it to
English so we can re-open it.
> 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException
> ------------------------------------------------------------------------------
>
> Key: FLINK-23603
> URL: https://issues.apache.org/jira/browse/FLINK-23603
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.13.1
> Environment:
> {code:java}
> pom.xml
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_2.12</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.12</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner-blink_2.12</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-scala_2.12</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> {code}
> {code:java}
> import com.atguigu.chapter05.bean.Water1;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.table.api.Expressions.$;
> public class Flink08_Time_ProcessingTime_DDL {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> tEnv.executeSql("create table sensor(" +
> "id string," +
> "ts bigint," +
> "vc int" +
> //"pt as proctime()" +
> ") with (" +
> " 'connector' = 'filesystem' ," +
> " 'path' = 'input/water.txt' ," +
> " 'format' = 'csv' " +
> ")");
> //tEnv.sqlQuery("select * from sensor").execute().print();
> //Table t1 = tEnv.sqlQuery("select id,ts,vc hight from sensor");
> Table t1 = tEnv.from("sensor");
> Table t2 = t1.select($("id"), $("ts"),$("vc").as("height"));
> /*t2.execute().print();
> t2.printSchema();*/
> tEnv.toAppendStream(t2, Water1.class).print();
> env.execute();
> }
> }
> {code}
> {code:java}
> import lombok.AllArgsConstructor;
> import lombok.Data;
> import lombok.NoArgsConstructor;
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> public class Water1 {
> private String id;
> private Long ts;
> private Integer height;
> }
> {code}
> {panel:title=water.txt}
> sensor_1,1,1
> sensor_1,2,2
> sensor_2,3,45
> sensor_1,4,4
> sensor_2,6,9
> sensor_1,7,6
> sensor_3,8,7
> {panel}
>
>
>
> Reporter: liuhong
> Priority: Major
>
> 当执行环境中Flink08_Time_ProcessingTime_DDL.main时会抛出以下异常,如果在Flink08_Time_ProcessingTime_DDL中修改
> Table t2 = t1.select($("id"),
> $("ts"),{color:#de350b}$("vc").as("height")){color};为
> Table t2 = t1.select($("id"),{color:#de350b}$("vc").as("height"){color},
> $("ts"));则正常输出结果
> Exception in thread "main" org.apache.flink.table.api.TableException: height
> is not found in id, ts, vcException in thread "main"
> org.apache.flink.table.api.TableException: height is not found in id, ts, vc
> at
> org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1(SinkCodeGenerator.scala:83)
> at
> org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1$adapted(SinkCodeGenerator.scala:79)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at
> scala.collection.TraversableLike.map(TraversableLike.scala:233) at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226) at
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194) at
> org.apache.flink.table.planner.codegen.SinkCodeGenerator$.generateRowConverterOperator(SinkCodeGenerator.scala:79)
> at
> org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator(SinkCodeGenerator.scala)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:190)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:141)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:70)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at
> scala.collection.Iterator.foreach(Iterator.scala:937) at
> scala.collection.Iterator.foreach$(Iterator.scala:937) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at
> scala.collection.IterableLike.foreach(IterableLike.scala:70) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:69) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike.map(TraversableLike.scala:233) at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:511)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:501)
> at
> com.atguigu.chapter11.Flink08_Time_ProcessingTime_DDL.main(Flink08_Time_ProcessingTime_DDL.java:36)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)