Hi Fabian,

+1 👍

Cheers
Dhanuka

On Mon, 14 Jan 2019, 21:29 Fabian Hueske <fhue...@gmail.com wrote:

> Hi,
>
> That's a Java limitation. Methods cannot be larger than 64kb and code that
> is generated for this predicate exceeds the limit.
> There is a Jira issue to fix the problem.
>
> In the meantime, I'd follow a hybrid approach and UNION ALL only as many
> tables as you need to avoid the code compilation exception.
>
> Best, Fabian
>
> Am Mo., 14. Jan. 2019 um 14:15 Uhr schrieb dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com>:
>
>> Hi Fabian ,
>>
>> I was encounter below error with 200 OR operators so I guess this is JVM
>> level limitation.
>>
>> Error :
>>
>> of class "datastreamcalcrule" grows beyond 64 kb
>>
>> Cheers
>> Dhanuka
>>
>>
>> On Mon, 14 Jan 2019, 20:30 Fabian Hueske <fhue...@gmail.com wrote:
>>
>>> Hi,
>>>
>>> you should avoid the UNION ALL approach because the query will scan the
>>> (identical?) Kafka topic 200 times which is highly inefficient.
>>> You should rather use your second approach and scale the query
>>> appropriately.
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
>>> dhanuka.priyan...@gmail.com>:
>>>
>>>> SORRY about sending mail without completing :) ,
>>>>
>>>>
>>>> I also tried out different approach , which is instead of UNION ALL,
>>>> use OR  as below.
>>>>
>>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>>> '%193400835%'
>>>> ) OR
>>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>>> '%193400835%'
>>>> ) OR
>>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>>> '%193400835%'
>>>> )
>>>>
>>>> But only downside is , with this approach if all the where clause 
>>>> conditions sets equal it seems Flink behave like use only one condition 
>>>> set.
>>>>
>>>> I have attached screenshot here with.
>>>>
>>>> Could you please explain me about this? Thanks in advance.
>>>>
>>>> Cheers,
>>>>
>>>> Dhanuka
>>>>
>>>>
>>>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>>>> dhanuka.priyan...@gmail.com> wrote:
>>>>
>>>>> Hi Hequn,
>>>>>
>>>>> I think it's obvious when we see the job graph for 200 unions. I have
>>>>> attached the screenshot here with.
>>>>>
>>>>> I also tried out different approach , which is instead of UNION ALL
>>>>>
>>>>>
>>>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <chenghe...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi dhanuka,
>>>>>>
>>>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>>> getting failing after some time.
>>>>>> Would be great if you can show us some information(say exception
>>>>>> stack) about the failure. Is it caused by OOM of job manager?
>>>>>>
>>>>>> > How do i allocate memory for task manager and job manager. What are
>>>>>> the factors need to be considered .
>>>>>> According to your SQL, I guess you need more memory for the job
>>>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>>>>> big. As for the taskmanger, I think it may be ok to use the default 
>>>>>> memory
>>>>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>>>>> to make better use of the memory(we can discuss more if you like).
>>>>>>
>>>>>> Best, Hequn
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>>>>
>>>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>>>>> dhanuka.priyan...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>> Thanks for the prompt reply and its working 🤗.
>>>>>>>
>>>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>>>> getting
>>>>>>> failing after some time.
>>>>>>>
>>>>>>> How do i allocate memory for task manager and job manager. What are
>>>>>>> the
>>>>>>> factors need to be considered .
>>>>>>>
>>>>>>> Cheers
>>>>>>> Dhanuka
>>>>>>>
>>>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote:
>>>>>>>
>>>>>>> > Hi Dhanuka,
>>>>>>> >
>>>>>>> > The important error message here is "AppendStreamTableSink
>>>>>>> requires that
>>>>>>> > Table has only insert changes".
>>>>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>>>>> > duplicate elimination.
>>>>>>> > Unfortunately, UNION is currently internally implemented as a
>>>>>>> regular
>>>>>>> > aggregration which produces a retraction stream (although, this
>>>>>>> would not
>>>>>>> > be necessary).
>>>>>>> >
>>>>>>> > If you don't require duplicate elimination, you can replace UNION
>>>>>>> by UNION
>>>>>>> > ALL and the query should work.
>>>>>>> > If you require duplicate elimination, it is currently not possible
>>>>>>> to use
>>>>>>> > SQL for your use case.
>>>>>>> >
>>>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>>>>> >
>>>>>>> > Best, Fabian
>>>>>>> >
>>>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>>>>> >
>>>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>>>>> > dhanuka.priyan...@gmail.com>:
>>>>>>> >
>>>>>>> >> Hi All,
>>>>>>> >>
>>>>>>> >> I am trying to select multiple results from Kafka and send
>>>>>>> results to
>>>>>>> >> Kafka
>>>>>>> >> different topic using Table API. But I am getting below error.
>>>>>>> Could you
>>>>>>> >> please help me on this.
>>>>>>> >>
>>>>>>> >> Query:
>>>>>>> >>
>>>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID
>>>>>>> =
>>>>>>> >> 4508724
>>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>>> >>  UNION
>>>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID
>>>>>>> =
>>>>>>> >> 4508724
>>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>>> >>  UNION
>>>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID
>>>>>>> =
>>>>>>> >> 4508724
>>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> *Error:*
>>>>>>> >>
>>>>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>>>> Exception
>>>>>>> >> occurred in REST handler.
>>>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>> main
>>>>>>> >> method
>>>>>>> >> caused an error.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>> main
>>>>>>> >> method
>>>>>>> >> caused an error.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>>> >> ... 3 more
>>>>>>> >> Caused by:
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>> >> main method caused an error.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>>>>> >> ... 4 more
>>>>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>>>>> >> AppendStreamTableSink
>>>>>>> >> requires that Table has only insert changes.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>>>>> >> ... 9 more
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> *Source Code:*
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> *StreamTableEnvironment tableEnv =
>>>>>>> >> TableEnvironment.getTableEnvironment(env);
>>>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>>>>> tableEnv.connect(new
>>>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>>>>> >> .startFromLatest()) .withFormat(new
>>>>>>> >>
>>>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>>>>> >> .rowtime(new
>>>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>>>>> >> .inAppendMode() .registerTableSource(sourceTable); //
>>>>>>> WindowedTable
>>>>>>> >> windowedTable = //
>>>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new
>>>>>>> StringBuilder();
>>>>>>> >> for(String sql : rules) {     if(multi.length() > 0) {
>>>>>>> multi.append("
>>>>>>> >> UNION
>>>>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>>>>> >> LOGGER.info("********************************* " +
>>>>>>> multi.toString());
>>>>>>> >> Table
>>>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare
>>>>>>> the
>>>>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>>>>> streaming
>>>>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>>>>> under a
>>>>>>> >> name .registerTableSourceAndSink("ruleTable");
>>>>>>> //tableEnv.sqlUpdate(
>>>>>>> >> "INSERT INTO ruleTable " + result);
>>>>>>> result.insertInto("ruleTable");
>>>>>>> >> Cheers,Dhanuka*
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> --
>>>>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Nothing Impossible,Creativity is more important than knowledge.
>>>>>
>>>>
>>>>
>>>> --
>>>> Nothing Impossible,Creativity is more important than knowledge.
>>>>
>>>
>> On 14 Jan 2019 20:30, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>
>> Hi,
>>
>> you should avoid the UNION ALL approach because the query will scan the
>> (identical?) Kafka topic 200 times which is highly inefficient.
>> You should rather use your second approach and scale the query
>> appropriately.
>>
>> Best, Fabian
>>
>> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
>> dhanuka.priyan...@gmail.com>:
>>
>>> SORRY about sending mail without completing :) ,
>>>
>>>
>>> I also tried out different approach , which is instead of UNION ALL, use
>>> OR  as below.
>>>
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>> '%193400835%'
>>> ) OR
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>> '%193400835%'
>>> ) OR
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>> '%193400835%'
>>> )
>>>
>>> But only downside is , with this approach if all the where clause 
>>> conditions sets equal it seems Flink behave like use only one condition set.
>>>
>>> I have attached screenshot here with.
>>>
>>> Could you please explain me about this? Thanks in advance.
>>>
>>> Cheers,
>>>
>>> Dhanuka
>>>
>>>
>>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>>> dhanuka.priyan...@gmail.com> wrote:
>>>
>>>> Hi Hequn,
>>>>
>>>> I think it's obvious when we see the job graph for 200 unions. I have
>>>> attached the screenshot here with.
>>>>
>>>> I also tried out different approach , which is instead of UNION ALL
>>>>
>>>>
>>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <chenghe...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi dhanuka,
>>>>>
>>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>> getting failing after some time.
>>>>> Would be great if you can show us some information(say exception
>>>>> stack) about the failure. Is it caused by OOM of job manager?
>>>>>
>>>>> > How do i allocate memory for task manager and job manager. What are
>>>>> the factors need to be considered .
>>>>> According to your SQL, I guess you need more memory for the job
>>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>>>> to make better use of the memory(we can discuss more if you like).
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>>>
>>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>>>> dhanuka.priyan...@gmail.com> wrote:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> Thanks for the prompt reply and its working 🤗.
>>>>>>
>>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>>> getting
>>>>>> failing after some time.
>>>>>>
>>>>>> How do i allocate memory for task manager and job manager. What are
>>>>>> the
>>>>>> factors need to be considered .
>>>>>>
>>>>>> Cheers
>>>>>> Dhanuka
>>>>>>
>>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote:
>>>>>>
>>>>>> > Hi Dhanuka,
>>>>>> >
>>>>>> > The important error message here is "AppendStreamTableSink requires
>>>>>> that
>>>>>> > Table has only insert changes".
>>>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>>>> > duplicate elimination.
>>>>>> > Unfortunately, UNION is currently internally implemented as a
>>>>>> regular
>>>>>> > aggregration which produces a retraction stream (although, this
>>>>>> would not
>>>>>> > be necessary).
>>>>>> >
>>>>>> > If you don't require duplicate elimination, you can replace UNION
>>>>>> by UNION
>>>>>> > ALL and the query should work.
>>>>>> > If you require duplicate elimination, it is currently not possible
>>>>>> to use
>>>>>> > SQL for your use case.
>>>>>> >
>>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>>>> >
>>>>>> > Best, Fabian
>>>>>> >
>>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>>>> >
>>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>>>> > dhanuka.priyan...@gmail.com>:
>>>>>> >
>>>>>> >> Hi All,
>>>>>> >>
>>>>>> >> I am trying to select multiple results from Kafka and send results
>>>>>> to
>>>>>> >> Kafka
>>>>>> >> different topic using Table API. But I am getting below error.
>>>>>> Could you
>>>>>> >> please help me on this.
>>>>>> >>
>>>>>> >> Query:
>>>>>> >>
>>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>  UNION
>>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>  UNION
>>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>
>>>>>> >>
>>>>>> >> *Error:*
>>>>>> >>
>>>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>>> Exception
>>>>>> >> occurred in REST handler.
>>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> main
>>>>>> >> method
>>>>>> >> caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> main
>>>>>> >> method
>>>>>> >> caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>> >> ... 3 more
>>>>>> >> Caused by:
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> >> main method caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>>>> >> ... 4 more
>>>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>>>> >> AppendStreamTableSink
>>>>>> >> requires that Table has only insert changes.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>>>> >> ... 9 more
>>>>>> >>
>>>>>> >>
>>>>>> >> *Source Code:*
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> *StreamTableEnvironment tableEnv =
>>>>>> >> TableEnvironment.getTableEnvironment(env);
>>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>>>> tableEnv.connect(new
>>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>>>> >> .startFromLatest()) .withFormat(new
>>>>>> >>
>>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>>>> >> .rowtime(new
>>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>>>>>> >> windowedTable = //
>>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new
>>>>>> StringBuilder();
>>>>>> >> for(String sql : rules) {     if(multi.length() > 0) {
>>>>>> multi.append("
>>>>>> >> UNION
>>>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>>>> >> LOGGER.info("********************************* " +
>>>>>> multi.toString());
>>>>>> >> Table
>>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare
>>>>>> the
>>>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>>>> streaming
>>>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>>>> under a
>>>>>> >> name .registerTableSourceAndSink("ruleTable");
>>>>>> //tableEnv.sqlUpdate(
>>>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>>>>>> >> Cheers,Dhanuka*
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> --
>>>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Nothing Impossible,Creativity is more important than knowledge.
>>>>
>>>
>>>
>>> --
>>> Nothing Impossible,Creativity is more important than knowledge.
>>>
>>
>>

Reply via email to