This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0a7acdce95 [Improve][Connector-V2] Doris stream load use FE instead of 
BE (#6235)
0a7acdce95 is described below

commit 0a7acdce95acd795a43c99e799cbced3dacbdf73
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jan 19 11:07:19 2024 +0800

    [Improve][Connector-V2] Doris stream load use FE instead of BE (#6235)
---
 .../connectors/doris/rest/RestService.java         | 177 +--------------------
 .../connectors/doris/rest/models/BackendV2.java    |  78 ---------
 .../doris/sink/committer/DorisCommitter.java       |   7 +-
 .../doris/sink/writer/DorisSinkWriter.java         |  46 +-----
 .../doris/sink/writer/DorisStreamLoad.java         |  12 +-
 .../doris/source/serialization/Routing.java        |   6 +-
 .../connectors/doris/util/ErrorMessages.java       |   6 +-
 .../e2e/connector/doris/AbstractDorisIT.java       |   2 -
 .../seatunnel/e2e/connector/doris/DorisIT.java     |   9 ++
 .../resources/doris_source_and_sink_2pc_false.conf |  50 ++++++
 10 files changed, 81 insertions(+), 312 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index 8729862516..315f36cfa2 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -22,9 +22,7 @@ import 
org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.config.DorisOptions;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.models.BackendV2;
 import org.apache.seatunnel.connectors.doris.rest.models.QueryPlan;
-import org.apache.seatunnel.connectors.doris.rest.models.Schema;
 import org.apache.seatunnel.connectors.doris.rest.models.Tablet;
 import org.apache.seatunnel.connectors.doris.util.ErrorMessages;
 
@@ -63,21 +61,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 @Slf4j
 public class RestService implements Serializable {
     public static final int REST_RESPONSE_STATUS_OK = 200;
-    public static final int REST_RESPONSE_CODE_OK = 0;
-    private static final String REST_RESPONSE_BE_ROWS_KEY = "rows";
     private static final String API_PREFIX = "/api";
-    private static final String SCHEMA = "_schema";
     private static final String QUERY_PLAN = "_query_plan";
-    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
-    @Deprecated private static final String BACKENDS = 
"/rest/v1/system?path=//backends";
-    private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
-    private static final String FE_LOGIN = "/rest/v1/login";
-    private static final String BASE_URL = "http://%s%s";;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     private static String send(DorisConfig dorisConfig, HttpRequestBase 
request, Logger logger)
             throws DorisConnectorException {
@@ -135,11 +125,10 @@ public class RestService implements Serializable {
                         request.getURI(),
                         response);
                 // Handle the problem of inconsistent data format returned by 
http v1 and v2
-                ObjectMapper mapper = new ObjectMapper();
-                Map map = mapper.readValue(response, Map.class);
+                Map map = OBJECT_MAPPER.readValue(response, Map.class);
                 if (map.containsKey("code") && map.containsKey("msg")) {
                     Object data = map.get("data");
-                    return mapper.writeValueAsString(data);
+                    return OBJECT_MAPPER.writeValueAsString(data);
                 } else {
                     return response;
                 }
@@ -170,7 +159,7 @@ public class RestService implements Serializable {
                                         .getBytes(StandardCharsets.UTF_8));
         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
         InputStream content = ((HttpPost) request).getEntity().getContent();
-        String res = IOUtils.toString(content);
+        String res = IOUtils.toString(content, StandardCharsets.UTF_8);
         conn.setDoOutput(true);
         conn.setDoInput(true);
         PrintWriter out = new PrintWriter(conn.getOutputStream());
@@ -260,103 +249,6 @@ public class RestService implements Serializable {
         return nodes.get(0).trim();
     }
 
-    @VisibleForTesting
-    static List<String> allEndpoints(String feNodes, Logger logger) throws 
DorisConnectorException {
-        logger.trace("Parse fenodes '{}'.", feNodes);
-        if (StringUtils.isEmpty(feNodes)) {
-            String errMsg =
-                    String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, 
"fenodes", feNodes);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        List<String> nodes =
-                
Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
-        Collections.shuffle(nodes);
-        return nodes;
-    }
-
-    @VisibleForTesting
-    public static String randomBackend(DorisConfig dorisConfig, Logger logger)
-            throws DorisConnectorException {
-        List<BackendV2.BackendRowV2> backends = getBackendsV2(dorisConfig, 
logger);
-        logger.trace("Parse beNodes '{}'.", backends);
-        if (backends == null || backends.isEmpty()) {
-            logger.error(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "beNodes", 
backends);
-            String errMsg =
-                    String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, 
"beNodes", backends);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        Collections.shuffle(backends);
-        BackendV2.BackendRowV2 backend = backends.get(0);
-        return backend.getIp() + ":" + backend.getHttpPort();
-    }
-
-    public static String getBackend(DorisConfig dorisConfig, Logger logger)
-            throws DorisConnectorException {
-        try {
-            return randomBackend(dorisConfig, logger);
-        } catch (Exception e) {
-            String errMsg = "Failed to get backend via " + 
dorisConfig.getFrontends();
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        }
-    }
-
-    @VisibleForTesting
-    public static List<BackendV2.BackendRowV2> getBackendsV2(DorisConfig 
dorisConfig, Logger logger)
-            throws DorisConnectorException {
-        String feNodes = dorisConfig.getFrontends();
-        List<String> feNodeList = allEndpoints(feNodes, logger);
-        for (String feNode : feNodeList) {
-            try {
-                String beUrl = "http://"; + feNode + BACKENDS_V2;
-                HttpGet httpGet = new HttpGet(beUrl);
-                String response = send(dorisConfig, httpGet, logger);
-                logger.info("Backend Info:{}", response);
-                return parseBackendV2(response, logger);
-            } catch (DorisConnectorException e) {
-                logger.info(
-                        "Doris FE node {} is unavailable: {}, Request the next 
Doris FE node",
-                        feNode,
-                        e.getMessage());
-            }
-        }
-        String errMsg = "No Doris FE is available, please check configuration";
-        throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-    }
-
-    static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger 
logger)
-            throws DorisConnectorException {
-        ObjectMapper mapper = new ObjectMapper();
-        BackendV2 backend;
-        try {
-            backend = mapper.readValue(response, BackendV2.class);
-        } catch (JsonParseException e) {
-            String errMsg = "Doris BE's response is not a json. res: " + 
response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (JsonMappingException e) {
-            String errMsg = "Doris BE's response cannot map to schema. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (IOException e) {
-            String errMsg = "Parse Doris BE's response to json failed. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        }
-
-        if (backend == null) {
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
-                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
-        }
-        List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
-        logger.debug("Parsing schema result is '{}'.", backendRows);
-        return backendRows;
-    }
-
     @VisibleForTesting
     static String getUriStr(DorisConfig dorisConfig, Logger logger) throws 
DorisConnectorException {
         String tableIdentifier = dorisConfig.getDatabase() + "." + 
dorisConfig.getTable();
@@ -371,64 +263,6 @@ public class RestService implements Serializable {
                 + "/";
     }
 
-    public static Schema getSchema(DorisConfig dorisConfig, Logger logger)
-            throws DorisConnectorException {
-        logger.trace("Finding schema.");
-        HttpGet httpGet = new HttpGet(getUriStr(dorisConfig, logger) + SCHEMA);
-        String response = send(dorisConfig, httpGet, logger);
-        logger.debug("Find schema response is '{}'.", response);
-        return parseSchema(response, logger);
-    }
-
-    public static boolean isUniqueKeyType(DorisConfig dorisConfig, Logger 
logger)
-            throws DorisConnectorException {
-        try {
-            return UNIQUE_KEYS_TYPE.equals(getSchema(dorisConfig, 
logger).getKeysType());
-        } catch (Exception e) {
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, e);
-        }
-    }
-
-    @VisibleForTesting
-    public static Schema parseSchema(String response, Logger logger)
-            throws DorisConnectorException {
-        logger.trace("Parse response '{}' to schema.", response);
-        ObjectMapper mapper = new ObjectMapper();
-        Schema schema;
-        try {
-            schema = mapper.readValue(response, Schema.class);
-        } catch (JsonParseException e) {
-            String errMsg = "Doris FE's response is not a json. res: " + 
response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (JsonMappingException e) {
-            String errMsg = "Doris FE's response cannot map to schema. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (IOException e) {
-            String errMsg = "Parse Doris FE's response to json failed. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        }
-
-        if (schema == null) {
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
-                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
-        }
-
-        if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
-            String errMsg = "Doris FE's response is not OK, status is " + 
schema.getStatus();
-            logger.error(errMsg);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        logger.debug("Parsing schema result is '{}'.", schema);
-        return schema;
-    }
-
     public static List<PartitionDefinition> findPartitions(
             SeaTunnelRowType rowType, DorisConfig dorisConfig, Logger logger)
             throws DorisConnectorException {
@@ -438,9 +272,6 @@ public class RestService implements Serializable {
         if (rowType.getFieldNames().length != 0) {
             readFields = String.join(",", rowType.getFieldNames());
         }
-        //        String readFields =
-        //                StringUtils.isBlank(dorisConfig.getReadField()) ? 
"*" :
-        // dorisConfig.getReadField();
         String sql =
                 "select "
                         + readFields
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java
deleted file mode 100644
index 47759e4bb0..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/** Be response model */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class BackendV2 {
-
-    @JsonProperty(value = "backends")
-    private List<BackendRowV2> backends;
-
-    public List<BackendRowV2> getBackends() {
-        return backends;
-    }
-
-    public void setBackends(List<BackendRowV2> backends) {
-        this.backends = backends;
-    }
-
-    public static class BackendRowV2 {
-        @JsonProperty("ip")
-        public String ip;
-
-        @JsonProperty("http_port")
-        public int httpPort;
-
-        @JsonProperty("is_alive")
-        public boolean isAlive;
-
-        public String getIp() {
-            return ip;
-        }
-
-        public void setIp(String ip) {
-            this.ip = ip;
-        }
-
-        public int getHttpPort() {
-            return httpPort;
-        }
-
-        public void setHttpPort(int httpPort) {
-            this.httpPort = httpPort;
-        }
-
-        public boolean isAlive() {
-            return isAlive;
-        }
-
-        public void setAlive(boolean alive) {
-            isAlive = alive;
-        }
-
-        public String toBackendString() {
-            return ip + ":" + httpPort;
-        }
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
index 92d18520ab..63d89f9e57 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.RestService;
 import org.apache.seatunnel.connectors.doris.sink.HttpPutBuilder;
 import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
 import org.apache.seatunnel.connectors.doris.util.HttpUtil;
@@ -94,14 +93,14 @@ public class DorisCommitter implements 
SinkCommitter<DorisCommitInfo> {
                 response = httpClient.execute(putBuilder.build());
             } catch (IOException e) {
                 log.error("commit transaction failed: ", e);
-                hostPort = RestService.getBackend(dorisConfig, log);
+                hostPort = dorisConfig.getFrontends();
                 continue;
             }
             statusCode = response.getStatusLine().getStatusCode();
             reasonPhrase = response.getStatusLine().getReasonPhrase();
             if (statusCode != HTTP_TEMPORARY_REDIRECT) {
                 log.warn("commit failed with {}, reason {}", hostPort, 
reasonPhrase);
-                hostPort = RestService.getBackend(dorisConfig, log);
+                hostPort = dorisConfig.getFrontends();
             } else {
                 break;
             }
@@ -113,7 +112,7 @@ public class DorisCommitter implements 
SinkCommitter<DorisCommitInfo> {
         }
 
         ObjectMapper mapper = new ObjectMapper();
-        if (response != null && response.getEntity() != null) {
+        if (response.getEntity() != null) {
             String loadResult = EntityUtils.toString(response.getEntity());
             Map<String, String> res =
                     mapper.readValue(loadResult, new 
TypeReference<HashMap<String, String>>() {});
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 40e0bc3a2f..8c945e0fed 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -25,8 +25,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.RestService;
-import org.apache.seatunnel.connectors.doris.rest.models.BackendV2;
 import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
 import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
 import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
@@ -39,8 +37,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -58,7 +54,6 @@ public class DorisSinkWriter
         implements SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>,
                 SupportMultiTableSinkWriter<Void> {
     private static final int INITIAL_DELAY = 200;
-    private static final int CONNECT_TIMEOUT = 1000;
     private static final List<String> DORIS_SUCCESS_STATUS =
             new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, 
LoadStatus.PUBLISH_TIMEOUT));
     private long lastCheckpointId;
@@ -73,15 +68,13 @@ public class DorisSinkWriter
     private final transient ScheduledExecutorService scheduledExecutorService;
     private transient Thread executorThread;
     private transient volatile Exception loadException = null;
-    private List<BackendV2.BackendRowV2> backends;
 
     public DorisSinkWriter(
             SinkWriter.Context context,
             List<DorisSinkState> state,
             CatalogTable catalogTable,
             DorisConfig dorisConfig,
-            String jobId)
-            throws IOException {
+            String jobId) {
         this.dorisConfig = dorisConfig;
         this.catalogTable = catalogTable;
         this.lastCheckpointId = !state.isEmpty() ? 
state.get(0).getCheckpointId() : 0;
@@ -105,9 +98,8 @@ public class DorisSinkWriter
         this.initializeLoad();
     }
 
-    private void initializeLoad() throws IOException {
-        this.backends = RestService.getBackendsV2(dorisConfig, log);
-        String backend = getAvailableBackend();
+    private void initializeLoad() {
+        String backend = dorisConfig.getFrontends();
         try {
             this.dorisStreamLoad =
                     new DorisStreamLoad(
@@ -178,15 +170,14 @@ public class DorisSinkWriter
     }
 
     @Override
-    public List<DorisSinkState> snapshotState(long checkpointId) throws 
IOException {
+    public List<DorisSinkState> snapshotState(long checkpointId) {
         checkState(dorisStreamLoad != null);
         startLoad(labelGenerator.generateLabel(checkpointId + 1));
         this.lastCheckpointId = checkpointId;
         return Collections.singletonList(new DorisSinkState(labelPrefix, 
lastCheckpointId));
     }
 
-    private void startLoad(String label) throws IOException {
-        this.dorisStreamLoad.setHostPort(getAvailableBackend());
+    private void startLoad(String label) {
         this.dorisStreamLoad.startLoad(label);
         this.loading = true;
     }
@@ -250,33 +241,6 @@ public class DorisSinkWriter
         }
     }
 
-    private String getAvailableBackend() {
-        Collections.shuffle(backends);
-        for (BackendV2.BackendRowV2 backend : backends) {
-            String res = backend.toBackendString();
-            if (tryHttpConnection(res)) {
-                return res;
-            }
-        }
-        String errMsg = "no available backend.";
-        throw new 
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, errMsg);
-    }
-
-    public boolean tryHttpConnection(String backend) {
-        try {
-            backend = "http://"; + backend;
-            URL url = new URL(backend);
-            HttpURLConnection co = (HttpURLConnection) url.openConnection();
-            co.setConnectTimeout(CONNECT_TIMEOUT);
-            co.connect();
-            co.disconnect();
-            return true;
-        } catch (Exception ex) {
-            log.warn("Failed to connect to backend:{}", backend, ex);
-            return false;
-        }
-    }
-
     private DorisSerializer createSerializer(
             DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) {
         return new SeaTunnelRowSerializer(
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index c2a8fc3bb5..bf2136091d 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -63,9 +63,8 @@ public class DorisStreamLoad implements Serializable {
     private static final String LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
     private static final String ABORT_URL_PATTERN = 
"http://%s/api/%s/_stream_load_2pc";;
     private static final String JOB_EXIST_FINISHED = "FINISHED";
-
-    private String loadUrlStr;
-    private String hostPort;
+    private final String loadUrlStr;
+    private final String hostPort;
     private final String abortUrlStr;
     private final String user;
     private final String passwd;
@@ -123,11 +122,6 @@ public class DorisStreamLoad implements Serializable {
         return hostPort;
     }
 
-    public void setHostPort(String hostPort) {
-        this.hostPort = hostPort;
-        this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, 
this.table);
-    }
-
     public Future<CloseableHttpResponse> getPendingLoadFuture() {
         return pendingLoadFuture;
     }
@@ -232,7 +226,7 @@ public class DorisStreamLoad implements Serializable {
         }
     }
 
-    public void startLoad(String label) throws IOException {
+    public void startLoad(String label) {
         loadBatchFirstRecord = true;
         recordCount = 0;
         this.label = label;
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/Routing.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/Routing.java
index e48a17da7c..95917a5ec7 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/Routing.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/Routing.java
@@ -48,7 +48,11 @@ public class Routing {
         try {
             this.port = Integer.parseInt(hostPort[1]);
         } catch (NumberFormatException e) {
-            logger.error(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, "Doris 
BE's port", hostPort[1]);
+            logger.error(
+                    String.format(
+                            ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
+                            "Doris BE's port",
+                            hostPort[1]));
             String errMsg =
                     String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, 
"routing", routing);
             throw new 
DorisConnectorException(DorisConnectorErrorCode.ROUTING_FAILED, errMsg, e);
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
index 8de43d3031..52e1397ece 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
@@ -19,12 +19,10 @@ package org.apache.seatunnel.connectors.doris.util;
 
 public abstract class ErrorMessages {
     public static final String PARSE_NUMBER_FAILED_MESSAGE =
-            "Parse '{}' to number failed. Original string is '{}'.";
-    public static final String PARSE_BOOL_FAILED_MESSAGE =
-            "Parse '{}' to boolean failed. Original string is '{}'.";
+            "Parse '%s' to number failed. Original string is '%s'.";
     public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {} 
failed.";
     public static final String ILLEGAL_ARGUMENT_MESSAGE =
-            "argument '{}' is illegal, value is '{}'.";
+            "argument '%s' is illegal, value is '%s'.";
     public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come 
here.";
     public static final String DORIS_INTERNAL_FAIL_MESSAGE =
             "Doris server '{}' internal failed, status is '{}', error message 
is '{}'";
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
index 3c33c82d3b..f8d48e465b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -47,8 +47,6 @@ import static org.awaitility.Awaitility.given;
 public abstract class AbstractDorisIT extends TestSuiteBase implements 
TestResource {
 
     protected GenericContainer<?> container;
-
-    // use image adamlee489/doris:2.0.3 when running this test on mac
     private static final String DOCKER_IMAGE = "bingquanzhao/doris:2.0.3";
     protected static final String HOST = "doris_e2e";
     protected static final int QUERY_PORT = 9030;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
index 1fe65d9ebc..fb414aacd9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
@@ -107,7 +107,16 @@ public class DorisIT extends AbstractDorisIT {
         batchInsertData();
         Container.ExecResult execResult = 
container.executeJob("/doris_source_and_sink.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
+        checkSinkData();
 
+        batchInsertData();
+        Container.ExecResult execResult2 =
+                container.executeJob("/doris_source_and_sink_2pc_false.conf");
+        Assertions.assertEquals(0, execResult2.getExitCode());
+        checkSinkData();
+    }
+
+    private void checkSinkData() {
         try {
             assertHasData(sourceDB, TABLE);
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_2pc_false.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_2pc_false.conf
new file mode 100644
index 0000000000..d8836ff2d5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_2pc_false.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env{
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+  Doris {
+      fenodes = "doris_e2e:8030"
+      username = root
+      password = ""
+      database = "e2e_source"
+      table = "doris_e2e_table"
+      doris.read.field = 
"F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE"
+      doris.filter.query = "F_ID > 50"
+  }
+}
+
+transform {}
+
+sink{
+  Doris {
+          fenodes = "doris_e2e:8030"
+          username = root
+          password = ""
+          table.identifier = "e2e_sink.doris_e2e_table"
+          sink.enable-2pc = "false"
+          sink.label-prefix = "test_json"
+          doris.config = {
+              format="json"
+              read_json_by_line="true"
+          }
+      }
+  }
\ No newline at end of file

Reply via email to