This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new aa0b2119a7 [Improve] Add disable 2pc in SelectDB cloud sink (#6266)
aa0b2119a7 is described below

commit aa0b2119a735effd33edba1559f0c2a755b5b2f7
Author: Jia Fan <[email protected]>
AuthorDate: Fri Feb 23 18:50:30 2024 +0800

    [Improve] Add disable 2pc in SelectDB cloud sink (#6266)
---
 docs/en/connector-v2/sink/SelectDB-Cloud.md        |  34 +++---
 .../connectors/selectdb/config/SelectDBConfig.java |  11 ++
 .../CopySQLUtil.java}                              |  61 +++--------
 .../selectdb/sink/committer/SelectDBCommitter.java | 119 +--------------------
 .../selectdb/sink/writer/SelectDBSinkWriter.java   |  15 ++-
 .../selectdb/sink/writer/SelectDBStageLoad.java    |  27 ++---
 .../connectors/selectdb/util/HttpUtil.java         |   2 +-
 7 files changed, 67 insertions(+), 202 deletions(-)

diff --git a/docs/en/connector-v2/sink/SelectDB-Cloud.md 
b/docs/en/connector-v2/sink/SelectDB-Cloud.md
index 6ad2997903..41ca0ddaf2 100644
--- a/docs/en/connector-v2/sink/SelectDB-Cloud.md
+++ b/docs/en/connector-v2/sink/SelectDB-Cloud.md
@@ -30,19 +30,20 @@ Version Supported
 
 ## Sink Options
 
-|        Name        |  Type  | Required |        Default         |            
                                                    Description                 
                                               |
-|--------------------|--------|----------|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|
-| load-url           | String | Yes      | -                      | `SelectDB 
Cloud` warehouse http address, the format is `warehouse_ip:http_port`           
                                                |
-| jdbc-url           | String | Yes      | -                      | `SelectDB 
Cloud` warehouse jdbc address, the format is `warehouse_ip:mysql_port`          
                                                |
-| cluster-name       | String | Yes      | -                      | `SelectDB 
Cloud` cluster name                                                             
                                                |
-| username           | String | Yes      | -                      | `SelectDB 
Cloud` user username                                                            
                                                |
-| password           | String | Yes      | -                      | `SelectDB 
Cloud` user password                                                            
                                                |
-| table.identifier   | String | Yes      | -                      | The name 
of `SelectDB Cloud` table, the format is `database.table`                       
                                                 |
-| sink.enable-delete | bool   | No       | false                  | Whether to 
enable deletion. This option requires SelectDB Cloud table to enable batch 
delete function, and only supports Unique model.    |
-| sink.max-retries   | int    | No       | 3                      | the max 
retry times if writing records to database failed                               
                                                  |
-| sink.buffer-size   | int    | No       | 10 * 1024 * 1024 (1MB) | the buffer 
size to cache data for stream load.                                             
                                               |
-| sink.buffer-count  | int    | No       | 10000                  | the buffer 
count to cache data for stream load.                                            
                                               |
-| selectdb.config    | map    | yes      | -                      | This 
option is used to support operations such as `insert`, `delete`, and `update` 
when automatically generate sql,and supported formats. |
+|        Name        |  Type  | Required |        Default         |            
                                                                                
                                                                        
Description                                                                     
                                                                                
               |
+|--------------------|--------|----------|------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| load-url           | String | Yes      | -                      | `SelectDB 
Cloud` warehouse http address, the format is `warehouse_ip:http_port`           
                                                                                
                                                                                
                                                                                
        |
+| jdbc-url           | String | Yes      | -                      | `SelectDB 
Cloud` warehouse jdbc address, the format is `warehouse_ip:mysql_port`          
                                                                                
                                                                                
                                                                                
        |
+| cluster-name       | String | Yes      | -                      | `SelectDB 
Cloud` cluster name                                                             
                                                                                
                                                                                
                                                                                
        |
+| username           | String | Yes      | -                      | `SelectDB 
Cloud` user username                                                            
                                                                                
                                                                                
                                                                                
        |
+| password           | String | Yes      | -                      | `SelectDB 
Cloud` user password                                                            
                                                                                
                                                                                
                                                                                
        |
+| sink.enable-2pc    | bool   | No       | true                   | Whether to 
enable two-phase commit (2pc), the default is true, to ensure Exactly-Once 
semantics. SelectDB uses cache files to load data. When the amount of data is 
large, cached data may become invalid (the default expiration time is 1 hour). 
If you encounter a large amount of data write loss, please configure 
sink.enable-2pc to false. |
+| table.identifier   | String | Yes      | -                      | The name 
of `SelectDB Cloud` table, the format is `database.table`                       
                                                                                
                                                                                
                                                                                
         |
+| sink.enable-delete | bool   | No       | false                  | Whether to 
enable deletion. This option requires SelectDB Cloud table to enable batch 
delete function, and only supports Unique model.                                
                                                                                
                                                                                
            |
+| sink.max-retries   | int    | No       | 3                      | the max 
retry times if writing records to database failed                               
                                                                                
                                                                                
                                                                                
          |
+| sink.buffer-size   | int    | No       | 10 * 1024 * 1024 (1MB) | the buffer 
size to cache data for stream load.                                             
                                                                                
                                                                                
                                                                                
       |
+| sink.buffer-count  | int    | No       | 10000                  | the buffer 
count to cache data for stream load.                                            
                                                                                
                                                                                
                                                                                
       |
+| selectdb.config    | map    | yes      | -                      | This 
option is used to support operations such as `insert`, `delete`, and `update` 
when automatically generate sql,and supported formats.                          
                                                                                
                                                                                
               |
 
 ## Data Type Mapping
 
@@ -170,10 +171,3 @@ sink {
 }
 ```
 
-## Changelog
-
-### next version
-
-- [Feature] Support SelectDB Cloud Sink Connector 
[3958](https://github.com/apache/seatunnel/pull/3958)
-- [Improve] Refactor some SelectDB Cloud Sink code as well as support copy 
into batch and async flush and cdc 
[4312](https://github.com/apache/seatunnel/pull/4312)
-
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
index 7025387364..50c3442c6b 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
@@ -71,6 +71,11 @@ public class SelectDBConfig {
                     .noDefaultValue()
                     .withDescription("the jdbc password.");
 
+    public static final Option<Boolean> SINK_ENABLE_2PC =
+            Options.key("sink.enable-2pc")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("enable 2PC while loading");
     // sink config options
     public static final Option<Integer> SINK_MAX_RETRIES =
             Options.key("sink.max-retries")
@@ -120,6 +125,7 @@ public class SelectDBConfig {
     private String tableIdentifier;
     private Boolean enableDelete;
     private String labelPrefix;
+    private boolean enable2PC;
     private Integer maxRetries;
     private Integer bufferSize;
     private Integer bufferCount;
@@ -146,6 +152,11 @@ public class SelectDBConfig {
         } else {
             selectdbConfig.setMaxRetries(SINK_MAX_RETRIES.defaultValue());
         }
+        if (pluginConfig.hasPath(SINK_ENABLE_2PC.key())) {
+            
selectdbConfig.setEnable2PC(pluginConfig.getBoolean(SINK_ENABLE_2PC.key()));
+        } else {
+            selectdbConfig.setEnable2PC(SINK_ENABLE_2PC.defaultValue());
+        }
         if (pluginConfig.hasPath(SINK_BUFFER_SIZE.key())) {
             
selectdbConfig.setBufferSize(pluginConfig.getInt(SINK_BUFFER_SIZE.key()));
         } else {
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopySQLUtil.java
similarity index 71%
copy from 
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
copy to 
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopySQLUtil.java
index 7c210be845..48e19b1b1a 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopySQLUtil.java
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.selectdb.sink.committer;
+package org.apache.seatunnel.connectors.selectdb.rest;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
 import 
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
-import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
-import org.apache.seatunnel.connectors.selectdb.rest.CopyIntoResp;
 import org.apache.seatunnel.connectors.selectdb.sink.writer.LoadStatus;
 import org.apache.seatunnel.connectors.selectdb.util.HttpPostBuilder;
 import org.apache.seatunnel.connectors.selectdb.util.HttpUtil;
@@ -40,53 +35,21 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 @Slf4j
-public class SelectDBCommitter implements SinkCommitter<SelectDBCommitInfo> {
+public class CopySQLUtil {
+
     private static final String COMMIT_PATTERN = "http://%s/copy/query";;
     private static final int HTTP_TEMPORARY_REDIRECT = 200;
-    private final ObjectMapper objectMapper = new ObjectMapper();
-    private final CloseableHttpClient httpClient;
-    private final SelectDBConfig selectdbConfig;
-    int maxRetry;
-
-    public SelectDBCommitter(Config pluginConfig) {
-        this(
-                SelectDBConfig.loadConfig(pluginConfig),
-                SelectDBConfig.loadConfig(pluginConfig).getMaxRetries(),
-                new HttpUtil().getHttpClient());
-    }
-
-    public SelectDBCommitter(
-            SelectDBConfig selectdbConfig, int maxRetry, CloseableHttpClient 
client) {
-        this.selectdbConfig = selectdbConfig;
-        this.maxRetry = maxRetry;
-        this.httpClient = client;
-    }
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-    @Override
-    public List<SelectDBCommitInfo> commit(List<SelectDBCommitInfo> 
commitInfos)
+    public static void copyFileToDatabase(
+            SelectDBConfig selectdbConfig, String clusterName, String copySQL, 
String hostPort)
             throws IOException {
-        for (SelectDBCommitInfo committable : commitInfos) {
-            commitTransaction(committable);
-        }
-        return Collections.emptyList();
-    }
-
-    @Override
-    public void abort(List<SelectDBCommitInfo> commitInfos) throws IOException 
{}
-
-    private void commitTransaction(SelectDBCommitInfo commitInfo) throws 
IOException {
         long start = System.currentTimeMillis();
-        String hostPort = commitInfo.getHostPort();
-        String clusterName = commitInfo.getClusterName();
-        String copySQL = commitInfo.getCopySQL();
-        log.info("commit to cluster {} with copy sql: {}", clusterName, 
copySQL);
-
+        CloseableHttpClient httpClient = HttpUtil.getHttpClient();
         int statusCode = -1;
         String reasonPhrase = null;
         int retry = 0;
@@ -96,12 +59,12 @@ public class SelectDBCommitter implements 
SinkCommitter<SelectDBCommitInfo> {
         boolean success = false;
         CloseableHttpResponse response;
         String loadResult = "";
-        while (retry++ <= maxRetry) {
+        while (retry++ <= selectdbConfig.getMaxRetries()) {
             HttpPostBuilder postBuilder = new HttpPostBuilder();
             postBuilder
                     .setUrl(String.format(COMMIT_PATTERN, hostPort))
                     .baseAuth(selectdbConfig.getUsername(), 
selectdbConfig.getPassword())
-                    .setEntity(new 
StringEntity(objectMapper.writeValueAsString(params)));
+                    .setEntity(new 
StringEntity(OBJECT_MAPPER.writeValueAsString(params)));
             try {
                 response = httpClient.execute(postBuilder.build());
             } catch (IOException e) {
@@ -135,7 +98,7 @@ public class SelectDBCommitter implements 
SinkCommitter<SelectDBCommitInfo> {
             throw new SelectDBConnectorException(
                     SelectDBConnectorErrorCode.COMMIT_FAILED,
                     "commit failed with SQL: "
-                            + commitInfo.getCopySQL()
+                            + copySQL
                             + " Commit error with status: "
                             + statusCode
                             + ", Reason: "
@@ -145,9 +108,9 @@ public class SelectDBCommitter implements 
SinkCommitter<SelectDBCommitInfo> {
         }
     }
 
-    public boolean handleCommitResponse(String loadResult) throws IOException {
+    private static boolean handleCommitResponse(String loadResult) throws 
IOException {
         BaseResponse<CopyIntoResp> baseResponse =
-                objectMapper.readValue(
+                OBJECT_MAPPER.readValue(
                         loadResult, new 
TypeReference<BaseResponse<CopyIntoResp>>() {});
         if (baseResponse.getCode() == LoadStatus.SUCCESS) {
             CopyIntoResp dataResp = baseResponse.getData();
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
index 7c210be845..9fea3e2ebf 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
@@ -21,51 +21,24 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
-import 
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
-import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
-import org.apache.seatunnel.connectors.selectdb.rest.CopyIntoResp;
-import org.apache.seatunnel.connectors.selectdb.sink.writer.LoadStatus;
-import org.apache.seatunnel.connectors.selectdb.util.HttpPostBuilder;
-import org.apache.seatunnel.connectors.selectdb.util.HttpUtil;
-import org.apache.seatunnel.connectors.selectdb.util.ResponseUtil;
+import org.apache.seatunnel.connectors.selectdb.rest.CopySQLUtil;
 
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 @Slf4j
 public class SelectDBCommitter implements SinkCommitter<SelectDBCommitInfo> {
-    private static final String COMMIT_PATTERN = "http://%s/copy/query";;
-    private static final int HTTP_TEMPORARY_REDIRECT = 200;
-    private final ObjectMapper objectMapper = new ObjectMapper();
-    private final CloseableHttpClient httpClient;
     private final SelectDBConfig selectdbConfig;
-    int maxRetry;
 
     public SelectDBCommitter(Config pluginConfig) {
-        this(
-                SelectDBConfig.loadConfig(pluginConfig),
-                SelectDBConfig.loadConfig(pluginConfig).getMaxRetries(),
-                new HttpUtil().getHttpClient());
+        this(SelectDBConfig.loadConfig(pluginConfig));
     }
 
-    public SelectDBCommitter(
-            SelectDBConfig selectdbConfig, int maxRetry, CloseableHttpClient 
client) {
+    public SelectDBCommitter(SelectDBConfig selectdbConfig) {
         this.selectdbConfig = selectdbConfig;
-        this.maxRetry = maxRetry;
-        this.httpClient = client;
     }
 
     @Override
@@ -78,95 +51,13 @@ public class SelectDBCommitter implements 
SinkCommitter<SelectDBCommitInfo> {
     }
 
     @Override
-    public void abort(List<SelectDBCommitInfo> commitInfos) throws IOException 
{}
+    public void abort(List<SelectDBCommitInfo> commitInfos) {}
 
     private void commitTransaction(SelectDBCommitInfo commitInfo) throws 
IOException {
-        long start = System.currentTimeMillis();
         String hostPort = commitInfo.getHostPort();
         String clusterName = commitInfo.getClusterName();
         String copySQL = commitInfo.getCopySQL();
         log.info("commit to cluster {} with copy sql: {}", clusterName, 
copySQL);
-
-        int statusCode = -1;
-        String reasonPhrase = null;
-        int retry = 0;
-        Map<String, String> params = new HashMap<>();
-        params.put("cluster", clusterName);
-        params.put("sql", copySQL);
-        boolean success = false;
-        CloseableHttpResponse response;
-        String loadResult = "";
-        while (retry++ <= maxRetry) {
-            HttpPostBuilder postBuilder = new HttpPostBuilder();
-            postBuilder
-                    .setUrl(String.format(COMMIT_PATTERN, hostPort))
-                    .baseAuth(selectdbConfig.getUsername(), 
selectdbConfig.getPassword())
-                    .setEntity(new 
StringEntity(objectMapper.writeValueAsString(params)));
-            try {
-                response = httpClient.execute(postBuilder.build());
-            } catch (IOException e) {
-                log.error("commit error : ", e);
-                continue;
-            }
-            statusCode = response.getStatusLine().getStatusCode();
-            reasonPhrase = response.getStatusLine().getReasonPhrase();
-            if (statusCode != HTTP_TEMPORARY_REDIRECT) {
-                log.warn(
-                        "commit failed with status {} {}, reason {}",
-                        statusCode,
-                        hostPort,
-                        reasonPhrase);
-            } else if (response.getEntity() != null) {
-                loadResult = EntityUtils.toString(response.getEntity());
-                success = handleCommitResponse(loadResult);
-                if (success) {
-                    log.info(
-                            "commit success cost {}ms, response is {}",
-                            System.currentTimeMillis() - start,
-                            loadResult);
-                    break;
-                } else {
-                    log.warn("commit failed, retry again");
-                }
-            }
-        }
-
-        if (!success) {
-            throw new SelectDBConnectorException(
-                    SelectDBConnectorErrorCode.COMMIT_FAILED,
-                    "commit failed with SQL: "
-                            + commitInfo.getCopySQL()
-                            + " Commit error with status: "
-                            + statusCode
-                            + ", Reason: "
-                            + reasonPhrase
-                            + ", Response: "
-                            + loadResult);
-        }
-    }
-
-    public boolean handleCommitResponse(String loadResult) throws IOException {
-        BaseResponse<CopyIntoResp> baseResponse =
-                objectMapper.readValue(
-                        loadResult, new 
TypeReference<BaseResponse<CopyIntoResp>>() {});
-        if (baseResponse.getCode() == LoadStatus.SUCCESS) {
-            CopyIntoResp dataResp = baseResponse.getData();
-            if (LoadStatus.FAIL.equals(dataResp.getDataCode())) {
-                log.error("copy into execute failed, reason:{}", loadResult);
-                return false;
-            } else {
-                Map<String, String> result = dataResp.getResult();
-                if (!result.get("state").equals("FINISHED")
-                        && !ResponseUtil.isCommitted(result.get("msg"))) {
-                    log.error("copy into load failed, reason:{}", loadResult);
-                    return false;
-                } else {
-                    return true;
-                }
-            }
-        } else {
-            log.error("commit failed, reason:{}", loadResult);
-            return false;
-        }
+        CopySQLUtil.copyFileToDatabase(selectdbConfig, clusterName, copySQL, 
hostPort);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
index 8a0bd04400..43420d8ac7 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
@@ -98,7 +98,7 @@ public class SelectDBSinkWriter
     }
 
     @Override
-    public synchronized Optional<SelectDBCommitInfo> prepareCommit() throws 
IOException {
+    public synchronized Optional<SelectDBCommitInfo> prepareCommit() {
         checkState(selectDBStageLoad != null);
         log.info("checkpoint arrived, upload buffer to storage");
         try {
@@ -106,6 +106,10 @@ public class SelectDBSinkWriter
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
+        if (!selectdbConfig.isEnable2PC()) {
+            return Optional.empty();
+        }
+
         CopySQLBuilder copySQLBuilder =
                 new CopySQLBuilder(selectdbConfig, 
selectDBStageLoad.getFileList());
         String copySql = copySQLBuilder.buildCopySQL();
@@ -115,11 +119,12 @@ public class SelectDBSinkWriter
     }
 
     @Override
-    public synchronized List<SelectDBSinkState> snapshotState(long 
checkpointId)
-            throws IOException {
+    public synchronized List<SelectDBSinkState> snapshotState(long 
checkpointId) {
         checkState(selectDBStageLoad != null);
-        log.info("clear the file list {}", selectDBStageLoad.getFileList());
-        this.selectDBStageLoad.clearFileList();
+        if (selectdbConfig.isEnable2PC()) {
+            log.info("clear the file list {}", 
selectDBStageLoad.getFileList());
+            this.selectDBStageLoad.clearFileList();
+        }
         this.selectDBStageLoad.setCurrentCheckpointID(checkpointId + 1);
         return Collections.singletonList(selectdbSinkState);
     }
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
index 44aaf89772..7bc2f8c158 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
 import 
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
 import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connectors.selectdb.rest.CopySQLUtil;
 import org.apache.seatunnel.connectors.selectdb.util.HttpPutBuilder;
 
 import org.apache.http.Header;
@@ -32,8 +33,6 @@ import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -57,7 +56,6 @@ import static 
org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants
 
 @Slf4j
 public class SelectDBStageLoad implements Serializable {
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private final LabelGenerator labelGenerator;
     private final String lineDelimiter;
     private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";;
@@ -67,8 +65,6 @@ public class SelectDBStageLoad implements Serializable {
     private String hostPort;
     private final String username;
     private final String password;
-    private final String db;
-    private final String table;
     private final Properties stageLoadProps;
     private List<String> fileList = new CopyOnWriteArrayList();
     private RecordBuffer buffer;
@@ -84,9 +80,6 @@ public class SelectDBStageLoad implements Serializable {
     public SelectDBStageLoad(SelectDBConfig selectdbConfig, LabelGenerator 
labelGenerator) {
         this.selectdbConfig = selectdbConfig;
         this.hostPort = selectdbConfig.getLoadUrl();
-        String[] tableInfo = selectdbConfig.getTableIdentifier().split("\\.");
-        this.db = tableInfo[0];
-        this.table = tableInfo[1];
         this.username = selectdbConfig.getUsername();
         this.password = selectdbConfig.getPassword();
         this.labelGenerator = labelGenerator;
@@ -178,6 +171,7 @@ public class SelectDBStageLoad implements Serializable {
     }
 
     public void close() {
+        this.started.set(false);
         this.loadExecutorService.shutdown();
     }
 
@@ -185,11 +179,6 @@ public class SelectDBStageLoad implements Serializable {
         this.currentCheckpointID = currentCheckpointID;
     }
 
-    @VisibleForTesting
-    public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
-        this.httpClientBuilder = httpClientBuilder;
-    }
-
     class StageLoadAsyncExecutor implements Runnable {
         @Override
         public void run() {
@@ -200,6 +189,18 @@ public class SelectDBStageLoad implements Serializable {
                     if (buffer != null && buffer.getFileName() != null) {
                         uploadToStorage(buffer.getFileName(), buffer);
                         fileList.add(buffer.getFileName());
+                        if (!selectdbConfig.isEnable2PC()) {
+                            CopySQLBuilder copySQLBuilder =
+                                    new CopySQLBuilder(selectdbConfig, 
fileList);
+                            String copySql = copySQLBuilder.buildCopySQL();
+                            CopySQLUtil.copyFileToDatabase(
+                                    selectdbConfig,
+                                    selectdbConfig.getClusterName(),
+                                    copySql,
+                                    hostPort);
+                            log.info("clear the file list {}", fileList);
+                            clearFileList();
+                        }
                     }
                 } catch (Exception e) {
                     log.error("worker running error", e);
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
index 9ccf55451a..fb1919c336 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
@@ -23,7 +23,7 @@ import org.apache.http.impl.client.HttpClients;
 
 /** util to build http client. */
 public class HttpUtil {
-    public HttpUtil() {}
+    private HttpUtil() {}
 
     private static final HttpClientBuilder HTTP_CLIENT_BUILDER =
             HttpClients.custom().disableRedirectHandling();

Reply via email to