[ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161861#comment-17161861
 ] 

Jark Wu commented on FLINK-18652:
---------------------------------

I'm a little confused. You said in the description that data is repeatedly 
writing into ClickHouse. However, you said sink doesn't data in the comment... 
Besides, the job tasks do not map to the query. Where is the window aggregate 
and join? 

I saw you have a lot of join and windows. So what you said "the window is 15 
min,but,15 minutes after the first time, the data kepping repeat sink to 
ClickHouse" might be reasonable. Because windows output lots of data after 15 
min together, and there are more than one record will be joined for each join 
key, so you will see many records on the same join key in the sink, I guess.

> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --------------------------------------------------------------
>
>                 Key: FLINK-18652
>                 URL: https://issues.apache.org/jira/browse/FLINK-18652
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.10.0
>            Reporter: mzz
>            Priority: Critical
>         Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
>    data stream is : kafka->flinkSQL->clickhouse。
>    The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
>     LOG.info("kafka source table has created !")
>     val groupTable = streamTableEnvironment.sqlQuery(tempSql)
>     streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *    val re_table = streamTableEnvironment.sqlQuery(windowSql)
>     re_table.printSchema()
>     //    groupTable.printSchema()
>     val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *    rr.print()
>     streamTableEnvironment.createTemporaryView("result_table", rr)
>     val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
>     val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>       .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>       .setDBUrl(URL)
>       .setQuery(insertCKSql)
>       .setUsername(USERNAME)
>       .setPassword(PASSWORD)
>       .setBatchSize(10000)
>       .setParameterTypes(
>         Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
>         Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
>         Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>       )
>       .build()
>     streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>       "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>       Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>       sink)
> // insert into TableSink
>     s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to