[ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-18652:
-----------------------------------
    Labels: auto-deprioritized-critical stale-major  (was: 
auto-deprioritized-critical)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --------------------------------------------------------------
>
>                 Key: FLINK-18652
>                 URL: https://issues.apache.org/jira/browse/FLINK-18652
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC, Table SQL / Ecosystem
>    Affects Versions: 1.10.0
>            Reporter: mzz
>            Priority: Major
>              Labels: auto-deprioritized-critical, stale-major
>         Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
>    data stream is : kafka->flinkSQL->clickhouse。
>    The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
>     LOG.info("kafka source table has created !")
>     val groupTable = streamTableEnvironment.sqlQuery(tempSql)
>     streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *    val re_table = streamTableEnvironment.sqlQuery(windowSql)
>     re_table.printSchema()
>     //    groupTable.printSchema()
>     val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *    rr.print()
>     streamTableEnvironment.createTemporaryView("result_table", rr)
>     val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
>     val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>       .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>       .setDBUrl(URL)
>       .setQuery(insertCKSql)
>       .setUsername(USERNAME)
>       .setPassword(PASSWORD)
>       .setBatchSize(10000)
>       .setParameterTypes(
>         Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
>         Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
>         Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>       )
>       .build()
>     streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>       "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>       Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>       sink)
> // insert into TableSink
>     s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to