[
https://issues.apache.org/jira/browse/FLINK-16070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057087#comment-17057087
]
吴彦祖 commented on FLINK-16070:
-----------------------------
Hi, [~godfreyhe] .
I think meet similar problem in 1.10 . I copy the grammar 'EMIT' from blink(
add SqlEmit in RichSqlInsert),
and the first SQL is OK when translating to stream,
but the second one that does DATE_FORMAT throw the exception:
UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.
is it will be normal after this bug fixed?
{code:java}
INSERT INTO sink1
SELECT
COUNT(DISTINCT tradeNo),
TUMBLE_START(tradeTimeTs, INTERVAL '10' SECOND) as ts
FROM payInfo GROUP BY TUMBLE(tradeTimeTs, INTERVAL '10' SECOND)
EMIT WITH DELAY '2' SECOND BEFORE WATERMARK
{code}
{code:java}
INSERT INTO sink1
SELECT
COUNT(DISTINCT tradeNo),
DATE_FORMT(TUMBLE_START(tradeTimeTs, INTERVAL '10' SECOND),'yyyyMMddHHmmss') as
ts
FROM payInfo GROUP BY TUMBLE(tradeTimeTs, INTERVAL '10' SECOND)
EMIT WITH DELAY '2' SECOND BEFORE WATERMARK
{code}
> Blink planner can not extract correct unique key for UpsertStreamTableSink
> ---------------------------------------------------------------------------
>
> Key: FLINK-16070
> URL: https://issues.apache.org/jira/browse/FLINK-16070
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.0
> Reporter: Leonard Xu
> Assignee: godfrey he
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.10.1
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> I reproduce an Elasticsearch6UpsertTableSink issue which user reported in
> mail list[1] that Blink planner can not extract correct unique key for
> following query, but legacy planner works well.
> {code:java}
> // user code
> INSERT INTO ES6_ZHANGLE_OUTPUT
> SELECT aggId, pageId, ts_min as ts,
> count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>
> count(case when eventId = 'click' then 1 else null end) as clkCnt
> FROM (
> SELECT
> 'ZL_001' as aggId,
> pageId,
> eventId,
> recvTime,
> ts2Date(recvTime) as ts_min
> from kafka_zl_etrack_event_stream
> where eventId in ('exposure', 'click')
> ) as t1
> group by aggId, pageId, ts_min
> {code}
> I found that blink planner can not extract correct unique key in
> `*FlinkRelMetadataQuery.getUniqueKeys(relNode)*`, legacy planner works well
> in
> `*org.apache.flink.table.plan.util.UpdatingPlanChecker.getUniqueKeyFields(...)*
> `. A simple ETL job to reproduce this issue can refers[2]
>
> [1][http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-10-es-sink-exception-td32773.html]
> [2][https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2es/Kafka2UpsertEs.java]
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)