This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 869417660e [Improve] Improve doris sink to random use be (#6132)
869417660e is described below

commit 869417660edc32cfe99891ba256495105a94bbd9
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jan 9 20:06:44 2024 +0800

    [Improve] Improve doris sink to random use be (#6132)
---
 .../doris/sink/writer/DorisSinkWriter.java         | 19 +++++---------
 .../doris/sink/writer/DorisStreamLoad.java         | 29 +++++++++++++++-------
 2 files changed, 26 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 0abdc6269c..40e0bc3a2f 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -35,9 +35,7 @@ import 
org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
 import org.apache.seatunnel.connectors.doris.util.HttpUtil;
 import 
org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -76,7 +74,6 @@ public class DorisSinkWriter
     private transient Thread executorThread;
     private transient volatile Exception loadException = null;
     private List<BackendV2.BackendRowV2> backends;
-    private long pos;
 
     public DorisSinkWriter(
             SinkWriter.Context context,
@@ -156,7 +153,7 @@ public class DorisSinkWriter
     @Override
     public Optional<DorisCommitInfo> prepareCommit() throws IOException {
         RespContent respContent = flush();
-        if (!dorisConfig.getEnable2PC()) {
+        if (!dorisConfig.getEnable2PC() || respContent == null) {
             return Optional.empty();
         }
         long txnId = respContent.getTxnId();
@@ -165,12 +162,12 @@ public class DorisSinkWriter
                 new DorisCommitInfo(dorisStreamLoad.getHostPort(), 
dorisStreamLoad.getDb(), txnId));
     }
 
-    @NonNull private RespContent flush() throws IOException {
+    private RespContent flush() throws IOException {
         // disable exception checker before stop load.
         loading = false;
         checkState(dorisStreamLoad != null);
         RespContent respContent = dorisStreamLoad.stopLoad();
-        if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+        if (respContent != null && 
!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
             String errMsg =
                     String.format(
                             "stream load error: %s, see more in %s",
@@ -253,14 +250,11 @@ public class DorisSinkWriter
         }
     }
 
-    @VisibleForTesting
-    public String getAvailableBackend() {
-        long tmp = pos + backends.size();
-        while (pos < tmp) {
-            BackendV2.BackendRowV2 backend = backends.get((int) (pos % 
backends.size()));
+    private String getAvailableBackend() {
+        Collections.shuffle(backends);
+        for (BackendV2.BackendRowV2 backend : backends) {
             String res = backend.toBackendString();
             if (tryHttpConnection(res)) {
-                pos++;
                 return res;
             }
         }
@@ -279,7 +273,6 @@ public class DorisSinkWriter
             return true;
         } catch (Exception ex) {
             log.warn("Failed to connect to backend:{}", backend, ex);
-            pos++;
             return false;
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index 00f21e9ae5..c2a8fc3bb5 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -78,7 +78,8 @@ public class DorisStreamLoad implements Serializable {
     private Future<CloseableHttpResponse> pendingLoadFuture;
     private final CloseableHttpClient httpClient;
     private final ExecutorService executorService;
-    private boolean loadBatchFirstRecord;
+    private volatile boolean loadBatchFirstRecord;
+    private String label;
     private long recordCount = 0;
 
     public DorisStreamLoad(
@@ -191,6 +192,8 @@ public class DorisStreamLoad implements Serializable {
     public void writeRecord(byte[] record) throws IOException {
         if (loadBatchFirstRecord) {
             loadBatchFirstRecord = false;
+            recordStream.startInput();
+            startStreamLoad();
         } else {
             recordStream.write(lineDelimiter);
         }
@@ -214,21 +217,29 @@ public class DorisStreamLoad implements Serializable {
     }
 
     public RespContent stopLoad() throws IOException {
-        recordStream.endInput();
-        log.info("stream load stopped.");
-        checkState(pendingLoadFuture != null);
-        try {
-            return handlePreCommitResponse(pendingLoadFuture.get());
-        } catch (Exception e) {
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
+        if (pendingLoadFuture != null) {
+            log.info("stream load stopped.");
+            recordStream.endInput();
+            try {
+                return handlePreCommitResponse(pendingLoadFuture.get());
+            } catch (Exception e) {
+                throw new 
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
+            } finally {
+                pendingLoadFuture = null;
+            }
+        } else {
+            return null;
         }
     }
 
     public void startLoad(String label) throws IOException {
         loadBatchFirstRecord = true;
         recordCount = 0;
+        this.label = label;
+    }
+
+    private void startStreamLoad() {
         HttpPutBuilder putBuilder = new HttpPutBuilder();
-        recordStream.startInput();
         log.info("stream load started for {}", label);
         try {
             InputStreamEntity entity = new InputStreamEntity(recordStream);

Reply via email to