[
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161861#comment-17161861
]
Jark Wu commented on FLINK-18652:
---------------------------------
I'm a little confused. You said in the description that data is repeatedly
writing into ClickHouse. However, you said sink doesn't data in the comment...
Besides, the job tasks do not map to the query. Where is the window aggregate
and join?
I saw you have a lot of join and windows. So what you said "the window is 15
min,but,15 minutes after the first time, the data kepping repeat sink to
ClickHouse" might be reasonable. Because windows output lots of data after 15
min together, and there are more than one record will be joined for each join
key, so you will see many records on the same join key in the sink, I guess.
> JDBCAppendTableSink to ClickHouse (data always repeating)
> --------------------------------------------------------------
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.10.0
> Reporter: mzz
> Priority: Critical
> Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
> data stream is : kafka->flinkSQL->clickhouse。
> The window is 15 min,but,15 minutes after the first time, the data
> kepping repeat sink to ClickHouse, plz help me ,thx。
> {code:java}
> *// data source from kafka
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql ,use ProcessingTime
> * val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> // groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> * rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
> .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
> .setDBUrl(URL)
> .setQuery(insertCKSql)
> .setUsername(USERNAME)
> .setPassword(PASSWORD)
> .setBatchSize(10000)
> .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING,
> Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG,
> Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT,
> Types.FLOAT, Types.LONG()
> )
> .build()
> streamTableEnvironment.registerTableSink("ckResult",
> Array[String]("data_date", "point", "platform", "page_name",
> "component_name", "booth_name", "position1", "advertiser",
> "adv_code", "request_num", "return_num", "fill_rate", "expose_num",
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
> Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING,
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
> sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)