SORRY about sending mail without completing :) ,
I also tried out different approach , which is instead of UNION ALL, use OR as below. ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' ) OR ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' ) OR ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' ) But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set. I have attached screenshot here with. Could you please explain me about this? Thanks in advance. Cheers, Dhanuka On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe < dhanuka.priyan...@gmail.com> wrote: > Hi Hequn, > > I think it's obvious when we see the job graph for 200 unions. I have > attached the screenshot here with. > > I also tried out different approach , which is instead of UNION ALL > > > On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi dhanuka, >> >> > I am trying to deploy 200 SQL unions and it seems all the tasks getting >> failing after some time. >> Would be great if you can show us some information(say exception stack) >> about the failure. Is it caused by OOM of job manager? >> >> > How do i allocate memory for task manager and job manager. What are the >> factors need to be considered . >> According to your SQL, I guess you need more memory for the job >> manager[1] since you unionAll 200 tables, the job graph should be a bit >> big. As for the taskmanger, I think it may be ok to use the default memory >> setting unless you allocate a lot of memory in your UDFs or you just want >> to make better use of the memory(we can discuss more if you like). >> >> Best, Hequn >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager >> >> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe < >> dhanuka.priyan...@gmail.com> wrote: >> >>> Hi Fabian, >>> >>> Thanks for the prompt reply and its working 🤗. >>> >>> I am trying to deploy 200 SQL unions and it seems all the tasks getting >>> failing after some time. >>> >>> How do i allocate memory for task manager and job manager. What are the >>> factors need to be considered . >>> >>> Cheers >>> Dhanuka >>> >>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote: >>> >>> > Hi Dhanuka, >>> > >>> > The important error message here is "AppendStreamTableSink requires >>> that >>> > Table has only insert changes". >>> > This is because you use UNION instead of UNION ALL, which implies >>> > duplicate elimination. >>> > Unfortunately, UNION is currently internally implemented as a regular >>> > aggregration which produces a retraction stream (although, this would >>> not >>> > be necessary). >>> > >>> > If you don't require duplicate elimination, you can replace UNION by >>> UNION >>> > ALL and the query should work. >>> > If you require duplicate elimination, it is currently not possible to >>> use >>> > SQL for your use case. >>> > >>> > There is thea Jira issue FLINK-9422 to improve this case [1]. >>> > >>> > Best, Fabian >>> > >>> > [1] https://issues.apache.org/jira/browse/FLINK-9422 >>> > >>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe < >>> > dhanuka.priyan...@gmail.com>: >>> > >>> >> Hi All, >>> >> >>> >> I am trying to select multiple results from Kafka and send results to >>> >> Kafka >>> >> different topic using Table API. But I am getting below error. Could >>> you >>> >> please help me on this. >>> >> >>> >> Query: >>> >> >>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 , >>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>> >> 4508724 >>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>> >> UNION >>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 , >>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>> >> 4508724 >>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>> >> UNION >>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 , >>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>> >> 4508724 >>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>> >> >>> >> >>> >> *Error:* >>> >> >>> >> 2019-01-13 21:36:36,228 ERROR >>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - >>> Exception >>> >> occurred in REST handler. >>> >> org.apache.flink.runtime.rest.handler.RestHandlerException: >>> >> org.apache.flink.client.program.ProgramInvocationException: The main >>> >> method >>> >> caused an error. >>> >> at >>> >> >>> >> >>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151) >>> >> at >>> >> >>> >> >>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>> >> at >>> >> >>> >> >>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>> >> at >>> >> >>> >> >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>> >> at >>> >> >>> >> >>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) >>> >> at >>> >> >>> >> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> >> at >>> >> >>> >> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> >> at java.lang.Thread.run(Thread.java:748) >>> >> Caused by: java.util.concurrent.CompletionException: >>> >> org.apache.flink.client.program.ProgramInvocationException: The main >>> >> method >>> >> caused an error. >>> >> at >>> >> >>> >> >>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228) >>> >> at >>> >> >>> >> >>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>> >> ... 3 more >>> >> Caused by: >>> org.apache.flink.client.program.ProgramInvocationException: The >>> >> main method caused an error. >>> >> at >>> >> >>> >> >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >>> >> at >>> >> >>> >> >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >>> >> at >>> >> >>> >> >>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>> >> at >>> >> >>> >> >>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78) >>> >> at >>> >> >>> >> >>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120) >>> >> at >>> >> >>> >> >>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226) >>> >> ... 4 more >>> >> Caused by: org.apache.flink.table.api.TableException: >>> >> AppendStreamTableSink >>> >> requires that Table has only insert changes. >>> >> at >>> >> >>> >> >>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382) >>> >> at >>> >> >>> >> >>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784) >>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877) >>> >> at >>> >> >>> >> >>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153) >>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> >> at >>> >> >>> >> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> >> at >>> >> >>> >> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >> at java.lang.reflect.Method.invoke(Method.java:498) >>> >> at >>> >> >>> >> >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >>> >> ... 9 more >>> >> >>> >> >>> >> *Source Code:* >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> *StreamTableEnvironment tableEnv = >>> >> TableEnvironment.getTableEnvironment(env); >>> >> tableEnv.registerFunction("mytime", new MyTime(10)); >>> tableEnv.connect(new >>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer) >>> >> .startFromLatest()) .withFormat(new >>> >> >>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema()) >>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL") >>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR") >>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP()) >>> >> .rowtime(new >>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000))) >>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable >>> >> windowedTable = // >>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes")); >>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder(); >>> >> for(String sql : rules) { if(multi.length() > 0) { multi.append(" >>> >> UNION >>> >> ").append("\n"); } multi.append( sql); } >>> >> LOGGER.info("********************************* " + multi.toString()); >>> >> Table >>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the >>> >> external system to connect to .connect(new Kafka().version("0.10") >>> >> .topic("testout").startFromEarliest() .properties(kProducer) ) >>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema()) >>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL()) >>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING()) >>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for >>> streaming >>> >> tables .inAppendMode() // register as source, sink, or both and under >>> a >>> >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate( >>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable"); >>> >> Cheers,Dhanuka* >>> >> >>> >> >>> >> >>> >> -- >>> >> Nothing Impossible,Creativity is more important than knowledge. >>> >> >>> > >>> >> > > -- > Nothing Impossible,Creativity is more important than knowledge. > -- Nothing Impossible,Creativity is more important than knowledge.