ZanebonoAlter opened a new issue, #5352: URL: https://github.com/apache/seatunnel/issues/5352
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened I upgrade seatunnel from 2.3.1 to 2.3.3 due to I want Clickhouse writing is triggered when a checkpoint is reached https://github.com/apache/seatunnel/pull/4999 But seatunnel break when checkpoint is reached, and The shorter the interval I configure(checkpoint.interval = 20000), the greater the chance of triggering I think maybe the old Clickhouse JDBC connection is closed due to too fast thread switching, but the new inserted data uses the old channel, causing flush to throw an exception This config works in 2.3.1 ### SeaTunnel Version 2.3.3 branch: dev 9e85d1228d0cb8a0433ac05422362ec2623506c5 ### SeaTunnel Config ```conf env { execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 60000 } source { MySQL-CDC { result_table_name = "yc" parallelism = 1 server-id = 5656 username = "****" password = "****" table-names = ["**.****1"] base-url = "jdbc:mysql://***:3306/**" server-time-zone = "UTC+8" startup.mode = "initial" } MySQL-CDC { result_table_name = "hiswarn" parallelism = 1 server-id = 6657 username = "***" password = "****" table-names = ["**.****2"] base-url = "jdbc:mysql://****:3306/**" server-time-zone = "UTC+8" startup.mode = "initial" } } sink { Clickhouse { host = "****:8123" database = "migrate" table = "this_hiswarn" source_table_name = "hiswarn" # split mode options split_mode = true sharding_key = "f_time" } Clickhouse { host = "****:8123" database = "migrate" table = "this_yc" source_table_name = "yc" # split mode options split_mode = true sharding_key = "f_time" } } ``` ### Running Command ```shell java -Dhazelcast.client.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/hazelcast-client.yaml -Dseatunnel.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/seatunnel.yaml -Dhazelcast.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/hazelcast.yaml -Dlog4j2.configurationFile=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/log4j2_client.properties -Dseatunnel.logs.path=/home/seatunnel -cp /home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/lib/*:/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient --config ./config/test.streaming.conf -e local ``` ### Error Exception ```log org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.InterruptedException: sleep interrupted at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:166) ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92) ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98) ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:167) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:611) [seatunnel-starter.jar:2.3.3-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372] Caused by: java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_372] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:164) ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT] ... 12 more Caused by: org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] - Clickhouse execute batch statement error at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:119) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:134) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.prepareCommit(ClickhouseSinkWriter.java:93) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:166) ... 15 more Suppressed: java.sql.SQLException: Cannot operate on a closed statement at com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73) at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.ensureOpen(ClickHouseStatementImpl.java:114) at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.setString(InputBasedPreparedStatement.java:262) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.FieldNamedPreparedStatement.setString(FieldNamedPreparedStatement.java:123) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction.injectFields(StringInjectFunction.java:51) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcRowConverter.toExternal(JdbcRowConverter.java:79) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:42) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.closeStatements(BufferedBatchStatementExecutor.java:61) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor.close(JdbcBatchStatementExecutor.java:37) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:128) ... 17 more Caused by: java.sql.SQLException: Cannot operate on a closed statement at com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73) at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.ensureOpen(ClickHouseStatementImpl.java:114) at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.setString(InputBasedPreparedStatement.java:262) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.FieldNamedPreparedStatement.setString(FieldNamedPreparedStatement.java:123) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction.injectFields(StringInjectFunction.java:51) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcRowConverter.toExternal(JdbcRowConverter.java:79) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:42) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51) at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:117) ... 18 more ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
