hi,i was use 1.4.0。 Yezhenbao
> 在 2018年2月24日,17:55,Renjie Liu <liurenjie2...@gmail.com> 写道: > > Hi, according to flink doc, it seems that you need to pass at least one > argument into the table function. > >> On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <827295...@qq.com> wrote: >> >> Hey, I am new to flink and I have a question and want to see if anyone can >> help here. >> >> How to use Dimension table in Flink TableAPI with >> StreamExecutionEnvironment ? >> >> I use TableFuncion to deal this question, but it have some problem in debug >> like this: >> LogicalProject(col_1=[$0]) >> LogicalJoin(condition=[true], joinType=[left]) >> LogicalTableScan(table=[[test]]) >> LogicalTableFunctionScan(invocation=[dim_test()], >> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], >> elementType=[class [Ljava.lang.Object;]) >> >> This exception indicates that the query uses an unsupported SQL feature. >> Please check the documentation for the set of currently supported SQL >> features. >> at >> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) >> at >> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674) >> at >> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) >> at >> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216) >> at >> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692) >> at >> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64) >> >> SQL : select t.col_1 from test t left join lateral table(dim_test()) b on >> true >> >> Main Code: >> public static void main(String[] args) throws Exception { >> String sql = "select t.col_1 from test t left join lateral >> table(dim_test()) b on true"; >> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment stEnv = >> TableEnvironment.getTableEnvironment(streamEnv); >> Properties kafkaProps = new Properties(); >> kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); >> kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); >> kafkaProps.setProperty("group.id", "test"); >> Kafka010JsonTableSource tableSource = >> Kafka010JsonTableSource.builder() >> .forTopic("test") >> .withKafkaProperties(kafkaProps) >> .withSchema(TableSchema.builder() >> .field("col_1", Types.STRING) >> .field("col_2",Types.STRING).build()) >> .build(); >> stEnv.registerTableSource("test", tableSource); >> String[] columns = {"col","name"}; >> TypeInformation[] typeInformations = >> {TypeInformation.of(String.class),TypeInformation.of(String.class)}; >> TableSchema tableSchema = new >> TableSchema(columns,typeInformations); >> Map<String,Object> context = new HashMap<>(); >> context.put("mysql.url","jdbc:mysql://localhost:3306/test"); >> context.put("mysql.driver","com.mysql.jdbc.Driver"); >> context.put("mysql.user","test"); >> context.put("mysql.password","test"); >> context.put("mysql.table","dim_test"); >> StreamSqlDim dim = new >> MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context)); >> stEnv.registerFunction("dim_test",dim); >> >> String[] outColumns = {"col"}; >> TypeInformation[] outType = {TypeInformation.of(String.class)}; >> TableSink tableSink = new >> Kafka010JsonTableSink("test_out",kafkaProps); >> stEnv.registerTableSink("test_out",outColumns,outType,tableSink); >> Table t = stEnv.sql(sql); >> stEnv.insertInto(t,"test_out",stEnv.queryConfig()); >> streamEnv.execute(); >> } >> >> MySqlDim is extends TableFunction ,and the method eval() is empty,like >> this: >> public void eval(){ >> >> } >> >> >> >> -- > Liu, Renjie > Software Engineer, MVAD >