[ 
https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317076#comment-17317076
 ] 

JieFang.He commented on FLINK-19754:
------------------------------------

[~jark] My problem is not Exception on StreamTableEnvironment with Web UI. The 
situation is that, My Job has both DataStream and SQL, and both of them need 
the execute API to run the task (StreamExecutionEnvironment.execute and 
StreamTableEnvironment.execute). The Flink run work well, but restApi can only 
work with one of them, have both of them may cause exception "Cannot have more 
than one execute() or executeAsync() call in a single environment". It seems 
that restApi can only support one execute in job

> Cannot have more than one execute() or executeAsync() call in a single 
> environment.
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-19754
>                 URL: https://issues.apache.org/jira/browse/FLINK-19754
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.11.2
>            Reporter: little-tomato
>            Priority: Major
>
> I run this code on my Standalone Cluster. When i submit the job,the error log 
> is as follows:
> {code}
> 2020-10-20 11:53:42,969 WARN 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
> Could not execute application: 
>  org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Cannot have more than one execute() or executeAsync() call 
> in a single environment.
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>  [?:1.8.0_221]
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_221]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_221]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more 
> than one execute() or executeAsync() call in a single environment.
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) 
> ~[?:?]
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_221]
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_221]
>  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
> {code}
> my code is:
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
>  ...
>  FlinkKafkaConsumer<String> myConsumer = new 
> FlinkKafkaConsumer<String>("kafkatopic", new SimpleStringSchema(), 
> properties);
>  myConsumer.setStartFromLatest();
> DataStream<String> kafkaDataStream = env.addSource(myConsumer);
> SingleOutputStreamOperator<MessageInfo> sourceStream = kafkaDataStream
>  .map(new MapFunction<String, MessageInfo>()
> { ... }
> );
> DataStream<Row> dataStreamRow = sourceStream.map(new 
> MyMapFunction()).filter(new RuleDataProccessFunction()).map(new 
> MapFunction<MessageInfo, Row>()
> { private static final long serialVersionUID = 1L; @Override public Row 
> map(MessageInfo value) throws Exception \\{ ... }
> }).returns(new RowTypeInfo(rowTypeArr, fieldArr));
> tEnv.registerFunction("test",new TestFunction());
>  Table table = tEnv.fromDataStream(dataStreamRow, fieldStr);
>  tEnv.createTemporaryView("mytable", table);
> String ddl = "CREATE TABLE user_log_1155 ...from kafka topic:user_log_1155";
>  tEnv.executeSql(ddl);
> String ddl1 = "CREATE TABLE user_test_1155 ...from kafka 
> topic:user_test_1155";
>  tEnv.executeSql(ddl);
> StatementSet stmtSet = tEnv.createStatementSet();
>  stmtSet.addInsertSql("INSERT INTO user_log_1155 SELECT xxx from mytable");
>  stmtSet.addInsertSql("INSERT INTO user_test_1155 SELECT xxx from mytable");
>  stmtSet.execute();
>  env.execute(requestPrm.getString("xxx"));
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to