SORRY about sending mail without completing :) ,

I also tried out different approach , which is instead of UNION ALL, use
OR  as below.

( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE
'%193400835%'
) OR
( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE
'%193400835%'
) OR
( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE
'%193400835%'
)

But only downside is , with this approach if all the where clause
conditions sets equal it seems Flink behave like use only one
condition set.

I have attached screenshot here with.

Could you please explain me about this? Thanks in advance.

Cheers,

Dhanuka


On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
dhanuka.priyan...@gmail.com> wrote:

> Hi Hequn,
>
> I think it's obvious when we see the job graph for 200 unions. I have
> attached the screenshot here with.
>
> I also tried out different approach , which is instead of UNION ALL
>
>
> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi dhanuka,
>>
>> > I am trying to deploy 200 SQL unions and it seems all the tasks getting
>> failing after some time.
>> Would be great if you can show us some information(say exception stack)
>> about the failure. Is it caused by OOM of job manager?
>>
>> > How do i allocate memory for task manager and job manager. What are the
>> factors need to be considered .
>> According to your SQL, I guess you need more memory for the job
>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>> big. As for the taskmanger, I think it may be ok to use the default memory
>> setting unless you allocate a lot of memory in your UDFs or you just want
>> to make better use of the memory(we can discuss more if you like).
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>
>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>> dhanuka.priyan...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> Thanks for the prompt reply and its working 🤗.
>>>
>>> I am trying to deploy 200 SQL unions and it seems all the tasks getting
>>> failing after some time.
>>>
>>> How do i allocate memory for task manager and job manager. What are the
>>> factors need to be considered .
>>>
>>> Cheers
>>> Dhanuka
>>>
>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote:
>>>
>>> > Hi Dhanuka,
>>> >
>>> > The important error message here is "AppendStreamTableSink requires
>>> that
>>> > Table has only insert changes".
>>> > This is because you use UNION instead of UNION ALL, which implies
>>> > duplicate elimination.
>>> > Unfortunately, UNION is currently internally implemented as a regular
>>> > aggregration which produces a retraction stream (although, this would
>>> not
>>> > be necessary).
>>> >
>>> > If you don't require duplicate elimination, you can replace UNION by
>>> UNION
>>> > ALL and the query should work.
>>> > If you require duplicate elimination, it is currently not possible to
>>> use
>>> > SQL for your use case.
>>> >
>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>> >
>>> > Best, Fabian
>>> >
>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>> >
>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>> > dhanuka.priyan...@gmail.com>:
>>> >
>>> >> Hi All,
>>> >>
>>> >> I am trying to select multiple results from Kafka and send results to
>>> >> Kafka
>>> >> different topic using Table API. But I am getting below error. Could
>>> you
>>> >> please help me on this.
>>> >>
>>> >> Query:
>>> >>
>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>> >> 4508724
>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> >>  UNION
>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>> >> 4508724
>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> >>  UNION
>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>> >> 4508724
>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> >>
>>> >>
>>> >> *Error:*
>>> >>
>>> >> 2019-01-13 21:36:36,228 ERROR
>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>> Exception
>>> >> occurred in REST handler.
>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>> >> method
>>> >> caused an error.
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>> >> at
>>> >>
>>> >>
>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>> >> at
>>> >>
>>> >>
>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>> >> at
>>> >>
>>> >>
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> >> at
>>> >>
>>> >>
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>> >> at
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> >> at
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> >> at java.lang.Thread.run(Thread.java:748)
>>> >> Caused by: java.util.concurrent.CompletionException:
>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>> >> method
>>> >> caused an error.
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>> >> at
>>> >>
>>> >>
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>> >> ... 3 more
>>> >> Caused by:
>>> org.apache.flink.client.program.ProgramInvocationException: The
>>> >> main method caused an error.
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>> >> ... 4 more
>>> >> Caused by: org.apache.flink.table.api.TableException:
>>> >> AppendStreamTableSink
>>> >> requires that Table has only insert changes.
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>> >> at
>>> >>
>>> >>
>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> >> at
>>> >>
>>> >>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> >> at
>>> >>
>>> >>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>> >> at
>>> >>
>>> >>
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>> >> ... 9 more
>>> >>
>>> >>
>>> >> *Source Code:*
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> *StreamTableEnvironment tableEnv =
>>> >> TableEnvironment.getTableEnvironment(env);
>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>> tableEnv.connect(new
>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>> >> .startFromLatest()) .withFormat(new
>>> >>
>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>> >> .rowtime(new
>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>>> >> windowedTable = //
>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
>>> >> for(String sql : rules) {     if(multi.length() > 0) { multi.append("
>>> >> UNION
>>> >> ").append("\n");     }     multi.append( sql);      }
>>> >> LOGGER.info("********************************* " + multi.toString());
>>> >> Table
>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>> streaming
>>> >> tables .inAppendMode() // register as source, sink, or both and under
>>> a
>>> >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>>> >> Cheers,Dhanuka*
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>> >>
>>> >
>>>
>>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>


-- 
Nothing Impossible,Creativity is more important than knowledge.

Reply via email to