Hi Fabian, +1 👍
Cheers Dhanuka On Mon, 14 Jan 2019, 21:29 Fabian Hueske <fhue...@gmail.com wrote: > Hi, > > That's a Java limitation. Methods cannot be larger than 64kb and code that > is generated for this predicate exceeds the limit. > There is a Jira issue to fix the problem. > > In the meantime, I'd follow a hybrid approach and UNION ALL only as many > tables as you need to avoid the code compilation exception. > > Best, Fabian > > Am Mo., 14. Jan. 2019 um 14:15 Uhr schrieb dhanuka ranasinghe < > dhanuka.priyan...@gmail.com>: > >> Hi Fabian , >> >> I was encounter below error with 200 OR operators so I guess this is JVM >> level limitation. >> >> Error : >> >> of class "datastreamcalcrule" grows beyond 64 kb >> >> Cheers >> Dhanuka >> >> >> On Mon, 14 Jan 2019, 20:30 Fabian Hueske <fhue...@gmail.com wrote: >> >>> Hi, >>> >>> you should avoid the UNION ALL approach because the query will scan the >>> (identical?) Kafka topic 200 times which is highly inefficient. >>> You should rather use your second approach and scale the query >>> appropriately. >>> >>> Best, Fabian >>> >>> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe < >>> dhanuka.priyan...@gmail.com>: >>> >>>> SORRY about sending mail without completing :) , >>>> >>>> >>>> I also tried out different approach , which is instead of UNION ALL, >>>> use OR as below. >>>> >>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>>> '%193400835%' >>>> ) OR >>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>>> '%193400835%' >>>> ) OR >>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>>> '%193400835%' >>>> ) >>>> >>>> But only downside is , with this approach if all the where clause >>>> conditions sets equal it seems Flink behave like use only one condition >>>> set. >>>> >>>> I have attached screenshot here with. >>>> >>>> Could you please explain me about this? Thanks in advance. >>>> >>>> Cheers, >>>> >>>> Dhanuka >>>> >>>> >>>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe < >>>> dhanuka.priyan...@gmail.com> wrote: >>>> >>>>> Hi Hequn, >>>>> >>>>> I think it's obvious when we see the job graph for 200 unions. I have >>>>> attached the screenshot here with. >>>>> >>>>> I also tried out different approach , which is instead of UNION ALL >>>>> >>>>> >>>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <chenghe...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi dhanuka, >>>>>> >>>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks >>>>>> getting failing after some time. >>>>>> Would be great if you can show us some information(say exception >>>>>> stack) about the failure. Is it caused by OOM of job manager? >>>>>> >>>>>> > How do i allocate memory for task manager and job manager. What are >>>>>> the factors need to be considered . >>>>>> According to your SQL, I guess you need more memory for the job >>>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit >>>>>> big. As for the taskmanger, I think it may be ok to use the default >>>>>> memory >>>>>> setting unless you allocate a lot of memory in your UDFs or you just want >>>>>> to make better use of the memory(we can discuss more if you like). >>>>>> >>>>>> Best, Hequn >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager >>>>>> >>>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe < >>>>>> dhanuka.priyan...@gmail.com> wrote: >>>>>> >>>>>>> Hi Fabian, >>>>>>> >>>>>>> Thanks for the prompt reply and its working 🤗. >>>>>>> >>>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks >>>>>>> getting >>>>>>> failing after some time. >>>>>>> >>>>>>> How do i allocate memory for task manager and job manager. What are >>>>>>> the >>>>>>> factors need to be considered . >>>>>>> >>>>>>> Cheers >>>>>>> Dhanuka >>>>>>> >>>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote: >>>>>>> >>>>>>> > Hi Dhanuka, >>>>>>> > >>>>>>> > The important error message here is "AppendStreamTableSink >>>>>>> requires that >>>>>>> > Table has only insert changes". >>>>>>> > This is because you use UNION instead of UNION ALL, which implies >>>>>>> > duplicate elimination. >>>>>>> > Unfortunately, UNION is currently internally implemented as a >>>>>>> regular >>>>>>> > aggregration which produces a retraction stream (although, this >>>>>>> would not >>>>>>> > be necessary). >>>>>>> > >>>>>>> > If you don't require duplicate elimination, you can replace UNION >>>>>>> by UNION >>>>>>> > ALL and the query should work. >>>>>>> > If you require duplicate elimination, it is currently not possible >>>>>>> to use >>>>>>> > SQL for your use case. >>>>>>> > >>>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1]. >>>>>>> > >>>>>>> > Best, Fabian >>>>>>> > >>>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422 >>>>>>> > >>>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe < >>>>>>> > dhanuka.priyan...@gmail.com>: >>>>>>> > >>>>>>> >> Hi All, >>>>>>> >> >>>>>>> >> I am trying to select multiple results from Kafka and send >>>>>>> results to >>>>>>> >> Kafka >>>>>>> >> different topic using Table API. But I am getting below error. >>>>>>> Could you >>>>>>> >> please help me on this. >>>>>>> >> >>>>>>> >> Query: >>>>>>> >> >>>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 , >>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID >>>>>>> = >>>>>>> >> 4508724 >>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>>> >> UNION >>>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 , >>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID >>>>>>> = >>>>>>> >> 4508724 >>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>>> >> UNION >>>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 , >>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID >>>>>>> = >>>>>>> >> 4508724 >>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>>> >> >>>>>>> >> >>>>>>> >> *Error:* >>>>>>> >> >>>>>>> >> 2019-01-13 21:36:36,228 ERROR >>>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - >>>>>>> Exception >>>>>>> >> occurred in REST handler. >>>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException: >>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>> main >>>>>>> >> method >>>>>>> >> caused an error. >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>> >> at java.lang.Thread.run(Thread.java:748) >>>>>>> >> Caused by: java.util.concurrent.CompletionException: >>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>> main >>>>>>> >> method >>>>>>> >> caused an error. >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>>> >> ... 3 more >>>>>>> >> Caused by: >>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>> >> main method caused an error. >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226) >>>>>>> >> ... 4 more >>>>>>> >> Caused by: org.apache.flink.table.api.TableException: >>>>>>> >> AppendStreamTableSink >>>>>>> >> requires that Table has only insert changes. >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784) >>>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153) >>>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> >> at >>>>>>> >> >>>>>>> >> >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >>>>>>> >> ... 9 more >>>>>>> >> >>>>>>> >> >>>>>>> >> *Source Code:* >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> *StreamTableEnvironment tableEnv = >>>>>>> >> TableEnvironment.getTableEnvironment(env); >>>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10)); >>>>>>> tableEnv.connect(new >>>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer) >>>>>>> >> .startFromLatest()) .withFormat(new >>>>>>> >> >>>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema()) >>>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL") >>>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR") >>>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP()) >>>>>>> >> .rowtime(new >>>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000))) >>>>>>> >> .inAppendMode() .registerTableSource(sourceTable); // >>>>>>> WindowedTable >>>>>>> >> windowedTable = // >>>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes")); >>>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new >>>>>>> StringBuilder(); >>>>>>> >> for(String sql : rules) { if(multi.length() > 0) { >>>>>>> multi.append(" >>>>>>> >> UNION >>>>>>> >> ").append("\n"); } multi.append( sql); } >>>>>>> >> LOGGER.info("********************************* " + >>>>>>> multi.toString()); >>>>>>> >> Table >>>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare >>>>>>> the >>>>>>> >> external system to connect to .connect(new Kafka().version("0.10") >>>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) ) >>>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema()) >>>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL()) >>>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING()) >>>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for >>>>>>> streaming >>>>>>> >> tables .inAppendMode() // register as source, sink, or both and >>>>>>> under a >>>>>>> >> name .registerTableSourceAndSink("ruleTable"); >>>>>>> //tableEnv.sqlUpdate( >>>>>>> >> "INSERT INTO ruleTable " + result); >>>>>>> result.insertInto("ruleTable"); >>>>>>> >> Cheers,Dhanuka* >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> -- >>>>>>> >> Nothing Impossible,Creativity is more important than knowledge. >>>>>>> >> >>>>>>> > >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Nothing Impossible,Creativity is more important than knowledge. >>>>> >>>> >>>> >>>> -- >>>> Nothing Impossible,Creativity is more important than knowledge. >>>> >>> >> On 14 Jan 2019 20:30, "Fabian Hueske" <fhue...@gmail.com> wrote: >> >> Hi, >> >> you should avoid the UNION ALL approach because the query will scan the >> (identical?) Kafka topic 200 times which is highly inefficient. >> You should rather use your second approach and scale the query >> appropriately. >> >> Best, Fabian >> >> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe < >> dhanuka.priyan...@gmail.com>: >> >>> SORRY about sending mail without completing :) , >>> >>> >>> I also tried out different approach , which is instead of UNION ALL, use >>> OR as below. >>> >>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>> '%193400835%' >>> ) OR >>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>> '%193400835%' >>> ) OR >>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>> '%193400835%' >>> ) >>> >>> But only downside is , with this approach if all the where clause >>> conditions sets equal it seems Flink behave like use only one condition set. >>> >>> I have attached screenshot here with. >>> >>> Could you please explain me about this? Thanks in advance. >>> >>> Cheers, >>> >>> Dhanuka >>> >>> >>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe < >>> dhanuka.priyan...@gmail.com> wrote: >>> >>>> Hi Hequn, >>>> >>>> I think it's obvious when we see the job graph for 200 unions. I have >>>> attached the screenshot here with. >>>> >>>> I also tried out different approach , which is instead of UNION ALL >>>> >>>> >>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <chenghe...@gmail.com> >>>> wrote: >>>> >>>>> Hi dhanuka, >>>>> >>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks >>>>> getting failing after some time. >>>>> Would be great if you can show us some information(say exception >>>>> stack) about the failure. Is it caused by OOM of job manager? >>>>> >>>>> > How do i allocate memory for task manager and job manager. What are >>>>> the factors need to be considered . >>>>> According to your SQL, I guess you need more memory for the job >>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit >>>>> big. As for the taskmanger, I think it may be ok to use the default memory >>>>> setting unless you allocate a lot of memory in your UDFs or you just want >>>>> to make better use of the memory(we can discuss more if you like). >>>>> >>>>> Best, Hequn >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager >>>>> >>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe < >>>>> dhanuka.priyan...@gmail.com> wrote: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> Thanks for the prompt reply and its working 🤗. >>>>>> >>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks >>>>>> getting >>>>>> failing after some time. >>>>>> >>>>>> How do i allocate memory for task manager and job manager. What are >>>>>> the >>>>>> factors need to be considered . >>>>>> >>>>>> Cheers >>>>>> Dhanuka >>>>>> >>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote: >>>>>> >>>>>> > Hi Dhanuka, >>>>>> > >>>>>> > The important error message here is "AppendStreamTableSink requires >>>>>> that >>>>>> > Table has only insert changes". >>>>>> > This is because you use UNION instead of UNION ALL, which implies >>>>>> > duplicate elimination. >>>>>> > Unfortunately, UNION is currently internally implemented as a >>>>>> regular >>>>>> > aggregration which produces a retraction stream (although, this >>>>>> would not >>>>>> > be necessary). >>>>>> > >>>>>> > If you don't require duplicate elimination, you can replace UNION >>>>>> by UNION >>>>>> > ALL and the query should work. >>>>>> > If you require duplicate elimination, it is currently not possible >>>>>> to use >>>>>> > SQL for your use case. >>>>>> > >>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1]. >>>>>> > >>>>>> > Best, Fabian >>>>>> > >>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422 >>>>>> > >>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe < >>>>>> > dhanuka.priyan...@gmail.com>: >>>>>> > >>>>>> >> Hi All, >>>>>> >> >>>>>> >> I am trying to select multiple results from Kafka and send results >>>>>> to >>>>>> >> Kafka >>>>>> >> different topic using Table API. But I am getting below error. >>>>>> Could you >>>>>> >> please help me on this. >>>>>> >> >>>>>> >> Query: >>>>>> >> >>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 , >>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>>> >> 4508724 >>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>> >> UNION >>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 , >>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>>> >> 4508724 >>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>> >> UNION >>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 , >>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>>> >> 4508724 >>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>> >> >>>>>> >> >>>>>> >> *Error:* >>>>>> >> >>>>>> >> 2019-01-13 21:36:36,228 ERROR >>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - >>>>>> Exception >>>>>> >> occurred in REST handler. >>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException: >>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The >>>>>> main >>>>>> >> method >>>>>> >> caused an error. >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>> >> at java.lang.Thread.run(Thread.java:748) >>>>>> >> Caused by: java.util.concurrent.CompletionException: >>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The >>>>>> main >>>>>> >> method >>>>>> >> caused an error. >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>> >> ... 3 more >>>>>> >> Caused by: >>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>> >> main method caused an error. >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226) >>>>>> >> ... 4 more >>>>>> >> Caused by: org.apache.flink.table.api.TableException: >>>>>> >> AppendStreamTableSink >>>>>> >> requires that Table has only insert changes. >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784) >>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153) >>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >>>>>> >> ... 9 more >>>>>> >> >>>>>> >> >>>>>> >> *Source Code:* >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> *StreamTableEnvironment tableEnv = >>>>>> >> TableEnvironment.getTableEnvironment(env); >>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10)); >>>>>> tableEnv.connect(new >>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer) >>>>>> >> .startFromLatest()) .withFormat(new >>>>>> >> >>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema()) >>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL") >>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR") >>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP()) >>>>>> >> .rowtime(new >>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000))) >>>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable >>>>>> >> windowedTable = // >>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes")); >>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new >>>>>> StringBuilder(); >>>>>> >> for(String sql : rules) { if(multi.length() > 0) { >>>>>> multi.append(" >>>>>> >> UNION >>>>>> >> ").append("\n"); } multi.append( sql); } >>>>>> >> LOGGER.info("********************************* " + >>>>>> multi.toString()); >>>>>> >> Table >>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare >>>>>> the >>>>>> >> external system to connect to .connect(new Kafka().version("0.10") >>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) ) >>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema()) >>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL()) >>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING()) >>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for >>>>>> streaming >>>>>> >> tables .inAppendMode() // register as source, sink, or both and >>>>>> under a >>>>>> >> name .registerTableSourceAndSink("ruleTable"); >>>>>> //tableEnv.sqlUpdate( >>>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable"); >>>>>> >> Cheers,Dhanuka* >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> -- >>>>>> >> Nothing Impossible,Creativity is more important than knowledge. >>>>>> >> >>>>>> > >>>>>> >>>>> >>>> >>>> -- >>>> Nothing Impossible,Creativity is more important than knowledge. >>>> >>> >>> >>> -- >>> Nothing Impossible,Creativity is more important than knowledge. >>> >> >>