This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 16b01d0e [improvement] fix batch streamload when label already exist 
(#470)
16b01d0e is described below

commit 16b01d0e3cd85f10a7aeedd962b00d14e9bed80d
Author: wudi <[email protected]>
AuthorDate: Thu Aug 15 15:21:57 2024 +0800

    [improvement] fix batch streamload when label already exist (#470)
---
 .../apache/doris/flink/sink/HttpPutBuilder.java    |  4 +++
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 34 +++++++++++++---------
 2 files changed, 25 insertions(+), 13 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
index 44f6c9fe..da1a7999 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
@@ -117,6 +117,10 @@ public class HttpPutBuilder {
         return this;
     }
 
+    public String getLabel() {
+        return header.get("label");
+    }
+
     public HttpPut build() {
         Preconditions.checkNotNull(url);
         Preconditions.checkNotNull(httpEntity);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 3240dafe..42b83207 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.LoadStatus;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.http.client.entity.GzipCompressingEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -450,25 +451,22 @@ public class DorisBatchStreamLoad implements Serializable 
{
                 if (enableGroupCommit) {
                     LOG.info("stream load started with group commit on host 
{}", hostPort);
                 } else {
-                    LOG.info("stream load started for {} on host {}", label, 
hostPort);
+                    LOG.info(
+                            "stream load started for {} on host {}",
+                            putBuilder.getLabel(),
+                            hostPort);
                 }
 
                 try (CloseableHttpClient httpClient = 
httpClientBuilder.build()) {
                     try (CloseableHttpResponse response = 
httpClient.execute(putBuilder.build())) {
                         int statusCode = 
response.getStatusLine().getStatusCode();
+                        String reason = response.getStatusLine().toString();
                         if (statusCode == 200 && response.getEntity() != null) 
{
                             String loadResult = 
EntityUtils.toString(response.getEntity());
                             LOG.info("load Result {}", loadResult);
                             RespContent respContent =
                                     OBJECT_MAPPER.readValue(loadResult, 
RespContent.class);
-                            if 
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-                                String errMsg =
-                                        String.format(
-                                                "stream load error: %s, see 
more in %s",
-                                                respContent.getMessage(),
-                                                respContent.getErrorURL());
-                                throw new DorisBatchLoadException(errMsg);
-                            } else {
+                            if 
(DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                                 long cacheByteBeforeFlush =
                                         
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
                                 LOG.info(
@@ -482,16 +480,26 @@ public class DorisBatchStreamLoad implements Serializable 
{
                                     lock.unlock();
                                 }
                                 return;
+                            } else if (LoadStatus.LABEL_ALREADY_EXIST.equals(
+                                    respContent.getStatus())) {
+                                // todo: need to abort transaction when 
JobStatus not finished
+                                putBuilder.setLabel(label + "_" + retry);
+                                reason = respContent.getMessage();
+                            } else {
+                                String errMsg =
+                                        String.format(
+                                                "stream load error: %s, see 
more in %s",
+                                                respContent.getMessage(),
+                                                respContent.getErrorURL());
+                                throw new DorisBatchLoadException(errMsg);
                             }
                         }
                         LOG.error(
                                 "stream load failed with {}, reason {}, to 
retry",
                                 hostPort,
-                                response.getStatusLine().toString());
+                                reason);
                         if (retry == executionOptions.getMaxRetries()) {
-                            resEx =
-                                    new DorisRuntimeException(
-                                            "stream load failed with: " + 
response.getStatusLine());
+                            resEx = new DorisRuntimeException("stream load 
failed with: " + reason);
                         }
                     } catch (Exception ex) {
                         resEx = ex;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to