This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 007a9940e3 [Improve] Improve doris stream load client side error 
message (#6688)
007a9940e3 is described below

commit 007a9940e37c1cd89a4a8444d4aee60caf438881
Author: Jia Fan <[email protected]>
AuthorDate: Sun Apr 28 10:34:28 2024 +0800

    [Improve] Improve doris stream load client side error message (#6688)
---
 .../doris/sink/writer/DorisSinkWriter.java         | 33 ++-----------
 .../doris/sink/writer/DorisStreamLoad.java         | 23 ++++++++-
 .../connectors/doris/sink/writer/RecordBuffer.java | 56 ++++++++++++++--------
 .../connectors/doris/sink/writer/RecordStream.java |  4 ++
 .../e2e/connector/doris/DorisErrorIT.java          | 11 +++++
 .../fake_source_and_doris_sink_timeout_error.conf  |  8 +++-
 6 files changed, 83 insertions(+), 52 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 4443d7a0a4..496b91b25b 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -58,7 +58,6 @@ public class DorisSinkWriter
             new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, 
LoadStatus.PUBLISH_TIMEOUT));
     private long lastCheckpointId;
     private DorisStreamLoad dorisStreamLoad;
-    volatile boolean loading;
     private final DorisConfig dorisConfig;
     private final String labelPrefix;
     private final LabelGenerator labelGenerator;
@@ -66,7 +65,6 @@ public class DorisSinkWriter
     private final DorisSerializer serializer;
     private final CatalogTable catalogTable;
     private final ScheduledExecutorService scheduledExecutorService;
-    private Thread executorThread;
     private volatile Exception loadException = null;
 
     public DorisSinkWriter(
@@ -94,7 +92,6 @@ public class DorisSinkWriter
                         1, new 
ThreadFactoryBuilder().setNameFormat("stream-load-check").build());
         this.serializer = createSerializer(dorisConfig, 
catalogTable.getSeaTunnelRowType());
         this.intervalTime = dorisConfig.getCheckInterval();
-        this.loading = false;
         this.initializeLoad();
     }
 
@@ -123,7 +120,7 @@ public class DorisSinkWriter
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
-        checkLoadExceptionAndResetThread();
+        checkLoadException();
         byte[] serialize =
                 serializer.serialize(
                         dorisConfig.isNeedsUnsupportedTypeCasting()
@@ -154,7 +151,6 @@ public class DorisSinkWriter
 
     private RespContent flush() throws IOException {
         // disable exception checker before stop load.
-        loading = false;
         checkState(dorisStreamLoad != null);
         RespContent respContent = dorisStreamLoad.stopLoad();
         if (respContent != null && 
!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
@@ -177,7 +173,6 @@ public class DorisSinkWriter
 
     private void startLoad(String label) {
         this.dorisStreamLoad.startLoad(label);
-        this.loading = true;
     }
 
     @Override
@@ -194,37 +189,19 @@ public class DorisSinkWriter
     private void checkDone() {
         // the load future is done and checked in prepareCommit().
         // this will check error while loading.
+        String errorMsg;
         log.debug("start timer checker, interval {} ms", intervalTime);
-        if (dorisStreamLoad.getPendingLoadFuture() != null
-                && dorisStreamLoad.getPendingLoadFuture().isDone()) {
-            if (!loading) {
-                log.debug("not loading, skip timer checker");
-                return;
-            }
-            String errorMsg;
-            try {
-                RespContent content =
-                        dorisStreamLoad.handlePreCommitResponse(
-                                dorisStreamLoad.getPendingLoadFuture().get());
-                errorMsg = content.getMessage();
-            } catch (Exception e) {
-                errorMsg = e.getMessage();
-            }
-
+        if ((errorMsg = dorisStreamLoad.getLoadFailedMsg()) != null) {
+            log.error("stream load finished unexpectedly: {}", errorMsg);
             loadException =
                     new DorisConnectorException(
                             DorisConnectorErrorCode.STREAM_LOAD_FAILED, 
errorMsg);
-            log.error("stream load finished unexpectedly, interrupt worker 
thread! {}", errorMsg);
-            // set the executor thread interrupted in case blocking in write 
data.
-            executorThread.interrupt();
         }
     }
 
-    private void checkLoadExceptionAndResetThread() {
+    private void checkLoadException() {
         if (loadException != null) {
             throw new RuntimeException("error while loading data.", 
loadException);
-        } else {
-            executorThread = Thread.currentThread();
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index bf2136091d..eadcf94cd5 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -78,6 +78,7 @@ public class DorisStreamLoad implements Serializable {
     private final CloseableHttpClient httpClient;
     private final ExecutorService executorService;
     private volatile boolean loadBatchFirstRecord;
+    private volatile boolean loading = false;
     private String label;
     private long recordCount = 0;
 
@@ -199,7 +200,25 @@ public class DorisStreamLoad implements Serializable {
         return recordCount;
     }
 
-    public RespContent handlePreCommitResponse(CloseableHttpResponse response) 
throws Exception {
+    public String getLoadFailedMsg() {
+        if (!loading) {
+            return null;
+        }
+        if (this.getPendingLoadFuture() != null && 
this.getPendingLoadFuture().isDone()) {
+            String errorMessage;
+            try {
+                errorMessage = 
handlePreCommitResponse(pendingLoadFuture.get()).getMessage();
+            } catch (Exception e) {
+                errorMessage = e.getMessage();
+            }
+            recordStream.setErrorMessageByStreamLoad(errorMessage);
+            return errorMessage;
+        } else {
+            return null;
+        }
+    }
+
+    private RespContent handlePreCommitResponse(CloseableHttpResponse 
response) throws Exception {
         final int statusCode = response.getStatusLine().getStatusCode();
         if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity() != 
null) {
             String loadResult = EntityUtils.toString(response.getEntity());
@@ -211,6 +230,7 @@ public class DorisStreamLoad implements Serializable {
     }
 
     public RespContent stopLoad() throws IOException {
+        loading = false;
         if (pendingLoadFuture != null) {
             log.info("stream load stopped.");
             recordStream.endInput();
@@ -230,6 +250,7 @@ public class DorisStreamLoad implements Serializable {
         loadBatchFirstRecord = true;
         recordCount = 0;
         this.label = label;
+        this.loading = true;
     }
 
     private void startStreamLoad() {
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
index a7043886ca..183227fa6c 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.doris.sink.writer;
 
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
+
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -25,18 +29,21 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkState;
 
 /** Channel of record stream and HTTP data stream. */
 @Slf4j
 public class RecordBuffer {
-    BlockingQueue<ByteBuffer> writeQueue;
-    BlockingQueue<ByteBuffer> readQueue;
-    int bufferCapacity;
-    int queueSize;
-    ByteBuffer currentWriteBuffer;
-    ByteBuffer currentReadBuffer;
+    private final BlockingQueue<ByteBuffer> writeQueue;
+    private final BlockingQueue<ByteBuffer> readQueue;
+    private final int bufferCapacity;
+    private final int queueSize;
+    private ByteBuffer currentWriteBuffer;
+    private ByteBuffer currentReadBuffer;
+    // used to check stream load error by stream load thread
+    @Setter private volatile String errorMessageByStreamLoad;
 
     public RecordBuffer(int capacity, int queueSize) {
         log.info("init RecordBuffer capacity {}, count {}", capacity, 
queueSize);
@@ -76,7 +83,11 @@ public class RecordBuffer {
                 currentWriteBuffer = null;
             }
             if (!isEmpty) {
-                ByteBuffer byteBuffer = writeQueue.take();
+                ByteBuffer byteBuffer = null;
+                while (byteBuffer == null) {
+                    checkErrorMessageByStreamLoad();
+                    byteBuffer = writeQueue.poll(100, TimeUnit.MILLISECONDS);
+                }
                 ((Buffer) byteBuffer).flip();
                 checkState(byteBuffer.limit() == 0);
                 readQueue.put(byteBuffer);
@@ -89,8 +100,9 @@ public class RecordBuffer {
     public void write(byte[] buf) throws InterruptedException {
         int wPos = 0;
         do {
-            if (currentWriteBuffer == null) {
-                currentWriteBuffer = writeQueue.take();
+            while (currentWriteBuffer == null) {
+                checkErrorMessageByStreamLoad();
+                currentWriteBuffer = writeQueue.poll(100, 
TimeUnit.MILLISECONDS);
             }
             int available = currentWriteBuffer.remaining();
             int nWrite = Math.min(available, buf.length - wPos);
@@ -105,14 +117,15 @@ public class RecordBuffer {
     }
 
     public int read(byte[] buf) throws InterruptedException {
-        if (currentReadBuffer == null) {
-            currentReadBuffer = readQueue.take();
+        while (currentReadBuffer == null) {
+            checkErrorMessageByStreamLoad();
+            currentReadBuffer = readQueue.poll(100, TimeUnit.MILLISECONDS);
         }
         // add empty buffer as end flag
         if (currentReadBuffer.limit() == 0) {
             recycleBuffer(currentReadBuffer);
             currentReadBuffer = null;
-            checkState(readQueue.size() == 0);
+            checkState(readQueue.isEmpty());
             return -1;
         }
         int available = currentReadBuffer.remaining();
@@ -125,16 +138,17 @@ public class RecordBuffer {
         return nRead;
     }
 
-    private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
-        ((Buffer) buffer).clear();
-        writeQueue.put(buffer);
-    }
-
-    public int getWriteQueueSize() {
-        return writeQueue.size();
+    private void checkErrorMessageByStreamLoad() {
+        if (errorMessageByStreamLoad != null) {
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.STREAM_LOAD_FAILED, 
errorMessageByStreamLoad);
+        }
     }
 
-    public int getReadQueueSize() {
-        return readQueue.size();
+    private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
+        ((Buffer) buffer).clear();
+        while (!writeQueue.offer(buffer, 100, TimeUnit.MILLISECONDS)) {
+            checkErrorMessageByStreamLoad();
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
index 73d33e3dd1..01d373e1fa 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
@@ -57,4 +57,8 @@ public class RecordStream extends InputStream {
             throw new RuntimeException(e);
         }
     }
+
+    public void setErrorMessageByStreamLoad(String errorMessageByStreamLoad) {
+        recordBuffer.setErrorMessageByStreamLoad(errorMessageByStreamLoad);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
index 77936f230b..be0416d79e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
@@ -79,6 +79,17 @@ public class DorisErrorIT extends AbstractDorisIT {
         Thread.sleep(10 * 1000);
         super.container.stop();
         Assertions.assertNotEquals(0, future.get().getExitCode());
+        Assertions.assertTrue(
+                future.get()
+                        .getStderr()
+                        .contains(
+                                "Caused by: 
org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: 
ErrorCode:[Doris-01], ErrorDescription:[stream load error]"));
+        Assertions.assertTrue(
+                future.get()
+                        .getStderr()
+                        .contains(
+                                "at 
org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.checkErrorMessageByStreamLoad"));
+        log.info("doris error log: \n" + future.get().getStderr());
         super.container.start();
         // wait for the container to restart
         given().ignoreExceptions()
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
index 53e0eadaa4..88bd3a92ca 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
@@ -18,13 +18,13 @@
 env{
   parallelism = 1
   job.mode = "BATCH"
+  job.retry.times = 0
 }
 
 source{
   FakeSource {
-    row.num = 100
+    row.num = 1000
     split.num = 10
-    split.read-interval = 10000
     string.length = 1
     schema = {
       fields {
@@ -58,6 +58,10 @@ sink{
           password = ""
           table.identifier = "e2e_sink.doris_e2e_table"
           sink.enable-2pc = "true"
+          // stuck in get RecordBuffer
+          sink.buffer-size = 2
+          sink.buffer-count = 2
+
           sink.label-prefix = "test_json"
           doris.config = {
               format="json"

Reply via email to