This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3b0a393145 [Improve] [Connector-V2] Remove scheduler in JDBC sink 
#4736 (#5168)
3b0a393145 is described below

commit 3b0a393145392024f532939db6a6999cb3a8e29f
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Aug 8 10:39:44 2023 +0800

    [Improve] [Connector-V2] Remove scheduler in JDBC sink #4736 (#5168)
    
    
    ---------
    
    Co-authored-by: gdliu3 <[email protected]>
---
 docs/en/connector-v2/sink/DB2.md                   |  3 +-
 docs/en/connector-v2/sink/Jdbc.md                  |  8 +---
 docs/en/connector-v2/sink/Mysql.md                 |  3 +-
 docs/en/connector-v2/sink/OceanBase.md             |  3 +-
 docs/en/connector-v2/sink/PostgreSql.md            |  3 +-
 docs/en/connector-v2/sink/Snowflake.md             |  3 +-
 docs/en/connector-v2/sink/Vertica.md               |  3 +-
 .../jdbc/config/JdbcConnectionConfig.java          | 13 -------
 .../seatunnel/jdbc/config/JdbcOptions.java         |  6 ---
 .../seatunnel/jdbc/internal/JdbcOutputFormat.java  | 44 ----------------------
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |  2 -
 11 files changed, 7 insertions(+), 84 deletions(-)

diff --git a/docs/en/connector-v2/sink/DB2.md b/docs/en/connector-v2/sink/DB2.md
index 8f5a7285e3..fc0aaca094 100644
--- a/docs/en/connector-v2/sink/DB2.md
+++ b/docs/en/connector-v2/sink/DB2.md
@@ -65,8 +65,7 @@ semantics (using XA transaction guarantee).
 | support_upsert_by_query_primary_key_exist | Boolean | No       | false   | 
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance |
 | connection_check_timeout_sec              | Int     | No       | 30      | 
The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
 |
 | max_retries                               | Int     | No       | 0       | 
The number of retries to submit failed (executeBatch)                           
                                                                                
                                                                             |
-| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be 
flushed into the database                                                       
    |
-| batch_interval_ms                         | Int     | No       | 1000    | 
For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the database                                                               
          |
+| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `checkpoint.interval`<br/>, the data will be 
flushed into the database                                                       
  |
 | is_exactly_once                           | Boolean | No       | false   | 
Whether to enable exactly-once semantics, which will use Xa transactions. If 
on, you need to<br/>set `xa_data_source_class_name`.                            
                                                                                
|
 | generate_sink_sql                         | Boolean | No       | false   | 
Generate sql statements based on the database table you want to write to        
                                                                                
                                                                             |
 | xa_data_source_class_name                 | String  | No       | -       | 
The xa data source class name of the database Driver, for example, DB2 is 
`com.db2.cj.jdbc.Db2XADataSource`, and<br/>please refer to appendix for other 
data sources                                                                    
     |
diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index 9d68278cf5..755de8bb9a 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -41,7 +41,6 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 | connection_check_timeout_sec              | Int     | No       | 30          
  |
 | max_retries                               | Int     | No       | 0           
  |
 | batch_size                                | Int     | No       | 1000        
  |
-| batch_interval_ms                         | Int     | No       | 1000        
  |
 | is_exactly_once                           | Boolean | No       | false       
  |
 | generate_sink_sql                         | Boolean | No       | false       
  |
 | xa_data_source_class_name                 | String  | No       | -           
  |
@@ -107,12 +106,7 @@ The number of retries to submit failed (executeBatch)
 
 ### batch_size[int]
 
-For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`
-, the data will be flushed into the database
-
-### batch_interval_ms[int]
-
-For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`
+For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `checkpoint.interval`
 , the data will be flushed into the database
 
 ### is_exactly_once[boolean]
diff --git a/docs/en/connector-v2/sink/Mysql.md 
b/docs/en/connector-v2/sink/Mysql.md
index 92254c1b54..55c825ed16 100644
--- a/docs/en/connector-v2/sink/Mysql.md
+++ b/docs/en/connector-v2/sink/Mysql.md
@@ -67,8 +67,7 @@ semantics (using XA transaction guarantee).
 | support_upsert_by_query_primary_key_exist | Boolean | No       | false   | 
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance |
 | connection_check_timeout_sec              | Int     | No       | 30      | 
The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
 |
 | max_retries                               | Int     | No       | 0       | 
The number of retries to submit failed (executeBatch)                           
                                                                                
                                                                             |
-| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be 
flushed into the database                                                       
    |
-| batch_interval_ms                         | Int     | No       | 1000    | 
For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the database                                                               
          |
+| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `checkpoint.interval`<br/>, the data will be 
flushed into the database                                                       
  |
 | is_exactly_once                           | Boolean | No       | false   | 
Whether to enable exactly-once semantics, which will use Xa transactions. If 
on, you need to<br/>set `xa_data_source_class_name`.                            
                                                                                
|
 | generate_sink_sql                         | Boolean | No       | false   | 
Generate sql statements based on the database table you want to write to        
                                                                                
                                                                             |
 | xa_data_source_class_name                 | String  | No       | -       | 
The xa data source class name of the database Driver, for example, mysql is 
`com.mysql.cj.jdbc.MysqlXADataSource`, and<br/>please refer to appendix for 
other data sources                                                              
     |
diff --git a/docs/en/connector-v2/sink/OceanBase.md 
b/docs/en/connector-v2/sink/OceanBase.md
index ec87ce3d36..3cea0b5e6e 100644
--- a/docs/en/connector-v2/sink/OceanBase.md
+++ b/docs/en/connector-v2/sink/OceanBase.md
@@ -81,8 +81,7 @@ Write data through jdbc. Support Batch mode and Streaming 
mode, support concurre
 | support_upsert_by_query_primary_key_exist | Boolean | No       | false   | 
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance |
 | connection_check_timeout_sec              | Int     | No       | 30      | 
The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
 |
 | max_retries                               | Int     | No       | 0       | 
The number of retries to submit failed (executeBatch)                           
                                                                                
                                                                             |
-| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be 
flushed into the database                                                       
    |
-| batch_interval_ms                         | Int     | No       | 1000    | 
For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the database                                                               
          |
+| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `checkpoint.interval`<br/>, the data will be 
flushed into the database                                                       
  |
 | generate_sink_sql                         | Boolean | No       | false   | 
Generate sql statements based on the database table you want to write to        
                                                                                
                                                                             |
 | max_commit_attempts                       | Int     | No       | 3       | 
The number of retries for transaction commit failures                           
                                                                                
                                                                             |
 | transaction_timeout_sec                   | Int     | No       | -1      | 
The timeout after the transaction is opened, the default is -1 (never timeout). 
Note that setting the timeout may affect<br/>exactly-once semantics             
                                                                             |
diff --git a/docs/en/connector-v2/sink/PostgreSql.md 
b/docs/en/connector-v2/sink/PostgreSql.md
index f7d6469b60..3cb2b82811 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -74,8 +74,7 @@ semantics (using XA transaction guarantee).
 | support_upsert_by_query_primary_key_exist | Boolean | No       | false   | 
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance |
 | connection_check_timeout_sec              | Int     | No       | 30      | 
The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
 |
 | max_retries                               | Int     | No       | 0       | 
The number of retries to submit failed (executeBatch)                           
                                                                                
                                                                             |
-| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be 
flushed into the database                                                       
    |
-| batch_interval_ms                         | Int     | No       | 1000    | 
For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the database                                                               
          |
+| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `checkpoint.interval`<br/>, the data will be 
flushed into the database                                                       
  |
 | is_exactly_once                           | Boolean | No       | false   | 
Whether to enable exactly-once semantics, which will use Xa transactions. If 
on, you need to<br/>set `xa_data_source_class_name`.                            
                                                                                
|
 | generate_sink_sql                         | Boolean | No       | false   | 
Generate sql statements based on the database table you want to write to.       
                                                                                
                                                                             |
 | xa_data_source_class_name                 | String  | No       | -       | 
The xa data source class name of the database Driver, for example, PostgreSQL 
is `org.postgresql.xa.PGXADataSource`, and<br/>please refer to appendix for 
other data sources                                                              
   |
diff --git a/docs/en/connector-v2/sink/Snowflake.md 
b/docs/en/connector-v2/sink/Snowflake.md
index 21bfb175ef..1dfff5e09c 100644
--- a/docs/en/connector-v2/sink/Snowflake.md
+++ b/docs/en/connector-v2/sink/Snowflake.md
@@ -61,8 +61,7 @@ Write data through jdbc. Support Batch mode and Streaming 
mode, support concurre
 | support_upsert_by_query_primary_key_exist | Boolean | No       | false       
  | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance |
 | connection_check_timeout_sec              | Int     | No       | 30          
  | The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
 |
 | max_retries                               | Int     | No       | 0           
  | The number of retries to submit failed (executeBatch)                       
                                                                                
                                                                                
 |
-| batch_size                                | Int     | No       | 1000        
  | For batch writing, when the number of buffered records reaches the number 
of `batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be 
flushed into the database                                                       
    |
-| batch_interval_ms                         | Int     | No       | 1000        
  | For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the database                                                               
          |
+| batch_size                                | Int     | No       | 1000        
  | For batch writing, when the number of buffered records reaches the number 
of `batch_size` or the time reaches `checkpoint.interval`<br/>, the data will 
be flushed into the database                                                    
     |
 | max_commit_attempts                       | Int     | No       | 3           
  | The number of retries for transaction commit failures                       
                                                                                
                                                                                
 |
 | transaction_timeout_sec                   | Int     | No       | -1          
  | The timeout after the transaction is opened, the default is -1 (never 
timeout). Note that setting the timeout may affect<br/>exactly-once semantics   
                                                                                
       |
 | auto_commit                               | Boolean | No       | true        
  | Automatic transaction commit is enabled by default                          
                                                                                
                                                                                
 |
diff --git a/docs/en/connector-v2/sink/Vertica.md 
b/docs/en/connector-v2/sink/Vertica.md
index 0db8571d55..9a62440768 100644
--- a/docs/en/connector-v2/sink/Vertica.md
+++ b/docs/en/connector-v2/sink/Vertica.md
@@ -67,8 +67,7 @@ semantics (using XA transaction guarantee).
 | support_upsert_by_query_primary_key_exist | Boolean | No       | false   | 
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance |
 | connection_check_timeout_sec              | Int     | No       | 30      | 
The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
 |
 | max_retries                               | Int     | No       | 0       | 
The number of retries to submit failed (executeBatch)                           
                                                                                
                                                                             |
-| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be 
flushed into the database                                                       
    |
-| batch_interval_ms                         | Int     | No       | 1000    | 
For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the database                                                               
          |
+| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `checkpoint.interval`<br/>, the data will be 
flushed into the database                                                       
  |
 | is_exactly_once                           | Boolean | No       | false   | 
Whether to enable exactly-once semantics, which will use Xa transactions. If 
on, you need to<br/>set `xa_data_source_class_name`.                            
                                                                                
|
 | generate_sink_sql                         | Boolean | No       | false   | 
Generate sql statements based on the database table you want to write to        
                                                                                
                                                                             |
 | xa_data_source_class_name                 | String  | No       | -       | 
The xa data source class name of the database Driver, for example, vertical is 
`com.vertical.cj.jdbc.VerticalXADataSource`, and<br/>please refer to appendix 
for other data sources                                                          
|
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index 6e2147c03c..555963af2c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -38,7 +38,6 @@ public class JdbcConnectionConfig implements Serializable {
     public boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
 
     public int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
-    public int batchIntervalMs = JdbcOptions.BATCH_INTERVAL_MS.defaultValue();
 
     public String xaDataSourceClassName;
 
@@ -55,7 +54,6 @@ public class JdbcConnectionConfig implements Serializable {
         builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES));
         
builder.connectionCheckTimeoutSeconds(config.get(JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC));
         builder.batchSize(config.get(JdbcOptions.BATCH_SIZE));
-        builder.batchIntervalMs(config.get(JdbcOptions.BATCH_INTERVAL_MS));
         if (config.get(JdbcOptions.IS_EXACTLY_ONCE)) {
             
builder.xaDataSourceClassName(config.get(JdbcOptions.XA_DATA_SOURCE_CLASS_NAME));
             
builder.maxCommitAttempts(config.get(JdbcOptions.MAX_COMMIT_ATTEMPTS));
@@ -104,10 +102,6 @@ public class JdbcConnectionConfig implements Serializable {
         return batchSize;
     }
 
-    public int getBatchIntervalMs() {
-        return batchIntervalMs;
-    }
-
     public String getXaDataSourceClassName() {
         return xaDataSourceClassName;
     }
@@ -136,7 +130,6 @@ public class JdbcConnectionConfig implements Serializable {
         private String query;
         private boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
         private int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
-        private int batchIntervalMs = 
JdbcOptions.BATCH_INTERVAL_MS.defaultValue();
         private String xaDataSourceClassName;
         private int maxCommitAttempts = 
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
         private int transactionTimeoutSec = 
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
@@ -193,11 +186,6 @@ public class JdbcConnectionConfig implements Serializable {
             return this;
         }
 
-        public Builder batchIntervalMs(int batchIntervalMs) {
-            this.batchIntervalMs = batchIntervalMs;
-            return this;
-        }
-
         public Builder xaDataSourceClassName(String xaDataSourceClassName) {
             this.xaDataSourceClassName = xaDataSourceClassName;
             return this;
@@ -216,7 +204,6 @@ public class JdbcConnectionConfig implements Serializable {
         public JdbcConnectionConfig build() {
             JdbcConnectionConfig jdbcConnectionConfig = new 
JdbcConnectionConfig();
             jdbcConnectionConfig.batchSize = this.batchSize;
-            jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs;
             jdbcConnectionConfig.driverName = this.driverName;
             jdbcConnectionConfig.compatibleMode = this.compatibleMode;
             jdbcConnectionConfig.maxRetries = this.maxRetries;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index f5d1613c53..207995d0b4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -71,12 +71,6 @@ public interface JdbcOptions {
                             "For queries that return a large number of 
objects, "
                                     + "you can configure the row fetch size 
used in the query to improve performance by reducing the number database hits 
required to satisfy the selection criteria. Zero means use jdbc default 
value.");
 
-    Option<Integer> BATCH_INTERVAL_MS =
-            Options.key("batch_interval_ms")
-                    .intType()
-                    .defaultValue(0)
-                    .withDescription("batch interval milliSecond");
-
     Option<Boolean> IS_EXACTLY_ONCE =
             Options.key("is_exactly_once")
                     .booleanType()
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
index d47814f153..a7d7912522 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -34,11 +34,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -58,9 +53,6 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>> implem
     private transient E jdbcStatementExecutor;
     private transient int batchCount = 0;
     private transient volatile boolean closed = false;
-
-    private transient ScheduledExecutorService scheduler;
-    private transient ScheduledFuture<?> scheduledFuture;
     private transient volatile Exception flushException;
 
     public JdbcOutputFormat(
@@ -83,37 +75,6 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>> implem
                     e);
         }
         jdbcStatementExecutor = 
createAndOpenStatementExecutor(statementExecutorFactory);
-
-        if (jdbcConnectionConfig.getBatchIntervalMs() != 0
-                && jdbcConnectionConfig.getBatchSize() != 1) {
-            this.scheduler =
-                    Executors.newScheduledThreadPool(
-                            1,
-                            runnable -> {
-                                AtomicInteger cnt = new AtomicInteger(0);
-                                Thread thread = new Thread(runnable);
-                                thread.setDaemon(true);
-                                thread.setName(
-                                        "jdbc-upsert-output-format" + "-" + 
cnt.incrementAndGet());
-                                return thread;
-                            });
-            this.scheduledFuture =
-                    this.scheduler.scheduleWithFixedDelay(
-                            () -> {
-                                synchronized (JdbcOutputFormat.this) {
-                                    if (!closed) {
-                                        try {
-                                            flush();
-                                        } catch (Exception e) {
-                                            flushException = e;
-                                        }
-                                    }
-                                }
-                            },
-                            jdbcConnectionConfig.getBatchIntervalMs(),
-                            jdbcConnectionConfig.getBatchIntervalMs(),
-                            TimeUnit.MILLISECONDS);
-        }
     }
 
     private E createAndOpenStatementExecutor(StatementExecutorFactory<E> 
statementExecutorFactory) {
@@ -209,11 +170,6 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>> implem
         if (!closed) {
             closed = true;
 
-            if (this.scheduledFuture != null) {
-                scheduledFuture.cancel(false);
-                this.scheduler.shutdown();
-            }
-
             if (batchCount > 0) {
                 try {
                     flush();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index a26628ff3a..8209533f9d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -45,7 +45,6 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.AUTO_COMMIT;
-import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_INTERVAL_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC;
@@ -166,7 +165,6 @@ public class JdbcSinkFactory implements TableSinkFactory {
                         PASSWORD,
                         CONNECTION_CHECK_TIMEOUT_SEC,
                         BATCH_SIZE,
-                        BATCH_INTERVAL_MS,
                         IS_EXACTLY_ONCE,
                         GENERATE_SINK_SQL,
                         AUTO_COMMIT,

Reply via email to