ZanebonoAlter opened a new issue, #5352:
URL: https://github.com/apache/seatunnel/issues/5352

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   I upgrade seatunnel from 2.3.1 to 2.3.3 due to I want  Clickhouse writing is 
triggered when a checkpoint is reached
   https://github.com/apache/seatunnel/pull/4999
   But seatunnel break when  checkpoint is reached, and The shorter the 
interval I configure(checkpoint.interval = 20000), the greater the chance of 
triggering
   I think maybe the old Clickhouse JDBC connection is closed due to too fast 
thread switching, but the new inserted data uses the old channel, causing flush 
to throw an exception
   
   This config works in 2.3.1
   
   ### SeaTunnel Version
   
   2.3.3
   branch: dev
   9e85d1228d0cb8a0433ac05422362ec2623506c5
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 60000
   }
   
   source {
     MySQL-CDC {
       result_table_name = "yc"
       parallelism = 1
       server-id = 5656
       username = "****"
       password = "****"
       table-names = ["**.****1"]
       base-url = "jdbc:mysql://***:3306/**"
       server-time-zone = "UTC+8"
       startup.mode = "initial"
     }
   
     MySQL-CDC {
       result_table_name = "hiswarn"
       parallelism = 1
       server-id = 6657
       username = "***"
       password = "****"
       table-names = ["**.****2"]
       base-url = "jdbc:mysql://****:3306/**"
       server-time-zone = "UTC+8"
       startup.mode = "initial"
     }
   }
   
   sink {
     
   Clickhouse {
       host = "****:8123"
       database = "migrate"
       table = "this_hiswarn"
   
       source_table_name = "hiswarn"
       # split mode options
       split_mode = true
       sharding_key = "f_time"
     }
   
     Clickhouse {
       host = "****:8123"
       database = "migrate"
       table = "this_yc"
       
       source_table_name = "yc"
       # split mode options
       split_mode = true
       sharding_key = "f_time"
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   java 
-Dhazelcast.client.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/hazelcast-client.yaml
 
-Dseatunnel.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/seatunnel.yaml
 
-Dhazelcast.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/hazelcast.yaml
 
-Dlog4j2.configurationFile=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/log4j2_client.properties
 -Dseatunnel.logs.path=/home/seatunnel -cp 
/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/lib/*:/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/starter/seatunnel-starter.jar
 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient --config 
./config/test.streaming.conf -e local
   ```
   
   
   ### Error Exception
   
   ```log
   org.apache.seatunnel.common.utils.SeaTunnelException: 
java.lang.InterruptedException: sleep interrupted
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:166)
 ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92)
 ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98)
 ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
 ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95)
 ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:167)
 ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100)
 ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:611)
 [seatunnel-starter.jar:2.3.3-SNAPSHOT]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_372]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_372]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_372]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_372]
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
   Caused by: java.lang.InterruptedException: sleep interrupted
           at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_372]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:164)
 ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT]
           ... 12 more
   
   
   Caused by: 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException:
 ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink 
connector failed] - Clickhouse execute batch statement error
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:119)
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:134)
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.prepareCommit(ClickhouseSinkWriter.java:93)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:166)
           ... 15 more
           Suppressed: java.sql.SQLException: Cannot operate on a closed 
statement
                   at 
com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73)
                   at 
com.clickhouse.jdbc.internal.ClickHouseStatementImpl.ensureOpen(ClickHouseStatementImpl.java:114)
                   at 
com.clickhouse.jdbc.internal.InputBasedPreparedStatement.setString(InputBasedPreparedStatement.java:262)
                   at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.FieldNamedPreparedStatement.setString(FieldNamedPreparedStatement.java:123)
                   at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction.injectFields(StringInjectFunction.java:51)
                   at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcRowConverter.toExternal(JdbcRowConverter.java:79)
                   at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:42)
                   at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51)
                   at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.closeStatements(BufferedBatchStatementExecutor.java:61)
                   at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor.close(JdbcBatchStatementExecutor.java:37)
                   at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:128)
                   ... 17 more
   
   
   Caused by: java.sql.SQLException: Cannot operate on a closed statement
           at 
com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73)
           at 
com.clickhouse.jdbc.internal.ClickHouseStatementImpl.ensureOpen(ClickHouseStatementImpl.java:114)
           at 
com.clickhouse.jdbc.internal.InputBasedPreparedStatement.setString(InputBasedPreparedStatement.java:262)
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.FieldNamedPreparedStatement.setString(FieldNamedPreparedStatement.java:123)
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction.injectFields(StringInjectFunction.java:51)
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcRowConverter.toExternal(JdbcRowConverter.java:79)
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:42)
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51)
           at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:117)
           ... 18 more
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to