caogaoshuai opened a new issue, #2931: URL: https://github.com/apache/incubator-streampark/issues/2931
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-streampark/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### Java Version 1.8 ### Scala Version 2.11.x ### StreamPark Version 1.2.3 ### Flink Version 1.13.6 ### deploy mode yarn-application ### What happened When I insert data into the database (clickhouse) through streamx-flink-connector-clickhouse_2.11, there are some cases where the insertion fails. I know the cause of this bug. In the class file org.apache.streampark.flink.connector.failover.SinkRequest, the regular expression INSERT_REGEXP will fail to extract the data to be inserted. ``` lazy val sqlStatement: String = { val prefixMap: Map[String, List[String]] = Map[String, List[String]]() records.foreach( x => { val valueMatcher = INSERT_REGEXP.matcher(x) if (valueMatcher.find()) { val prefix = valueMatcher.group(1) prefixMap.get(prefix) match { case Some(value) => value.add(valueMatcher.group(3)) case None => prefixMap.put(prefix, List(valueMatcher.group(3))) } } else { logWarn(s"ignore record: $x") } }) ``` Give an example. When the insert statement is ```"INSERT INTO table(a) value(' value of')"```, the program originally expected to extract the string is ```(' value of')```, but it is indeed ```of')``` , which will cause a splicing error in the SQL statement. Another example. When the insert statement is ```"INSERT INTO table(a) values('a')"```, the program originally expected to extract the string is ```('a')```, but it is indeed ```s('a')```. I think it would be better to have a more robust extraction method, but I'm out of good ideas. ### Error Exception ```log 2023-08-08 19:42:48,305 ERROR com.streamxhub.streamx.flink.connector.clickhouse.internal.ClickHouseWriterTask [] - [StreamX] Error ClickHouseSink executing callback, params = { user: default, password: ******, hosts: http://192.168.1.96:8123,http://192.168.1.97:8123,http://192.168.1.98:8123,http://192.168.1.99:8123,http://192.168.1.100:8123,http://192.168.1.101:8123 } , StatusCode = 400 2023-08-08 19:42:48,305 WARN com.streamxhub.streamx.flink.connector.clickhouse.internal.ClickHouseWriterTask [] - [StreamX] Failed to send data to ClickHouse, cause: limit of attempts is exceeded. ClickHouse response = NettyResponse { statusCode=400 headers= Date: Tue, 08 Aug 2023 11:42:48 GMT Connection: Keep-Alive Content-Type: text/plain; charset=UTF-8 X-ClickHouse-Server-Display-Name: core-1-2.c-17050c8955804b92.cn-shanghai.emr.aliyuncs.com Transfer-Encoding: chunked X-ClickHouse-Exception-Code: 27 Keep-Alive: timeout=10 X-ClickHouse-Summary: {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"} body= Code: 27. DB::ParsingException: Cannot parse input: expected '(' before: '=12.6), CGM\' ), ( \'333678010@android\' , 333678010 , \'android\' , 1691468942268 , 1691494966100 , 1681222439167590400 , \'ZLZQBcdBirIDAAgaEmg7Hj78\' , {\'appVersion\'': at row 215: While executing ValuesBlockInputFormat. (CANNOT_PARSE_INPUT_ASSERTION_FAILED) (version 22.3.8.28) ``` ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR!(您是否要贡献这个PR?) ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
