This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new cf877c1  disable checker thread default (#366)
cf877c1 is described below

commit cf877c167baa9961bf0c9ab5a31918f5d455c900
Author: wudi <[email protected]>
AuthorDate: Mon Apr 15 16:18:26 2024 +0800

    disable checker thread default (#366)
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |  3 ++-
 .../doris/flink/sink/writer/DorisStreamLoad.java   |  2 +-
 .../doris/flink/sink/writer/DorisWriter.java       | 31 +++++++++++++---------
 .../doris/flink/table/DorisConfigOptions.java      |  5 ++--
 4 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index b1b49d1..c5a9f66 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -32,7 +32,8 @@ import static 
org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE
 public class DorisExecutionOptions implements Serializable {
 
     private static final long serialVersionUID = 1L;
-    public static final int DEFAULT_CHECK_INTERVAL = 10000;
+    // 0 means disable checker thread
+    public static final int DEFAULT_CHECK_INTERVAL = 0;
     public static final int DEFAULT_MAX_RETRY_TIMES = 3;
     private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
     private static final int DEFAULT_BUFFER_COUNT = 3;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index cbfa8ba..2680698 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -69,7 +69,7 @@ public class DorisStreamLoad implements Serializable {
     private final byte[] lineDelimiter;
     private static final String LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
     private static final String ABORT_URL_PATTERN = 
"http://%s/api/%s/_stream_load_2pc";;
-    private static final String JOB_EXIST_FINISHED = "FINISHED";
+    public static final String JOB_EXIST_FINISHED = "FINISHED";
 
     private String loadUrlStr;
     private String hostPort;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index db6094e..08edfc7 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+import static 
org.apache.doris.flink.sink.writer.DorisStreamLoad.JOB_EXIST_FINISHED;
 
 /**
  * Doris Writer will load data to doris.
@@ -131,10 +132,15 @@ public class DorisWriter<IN>
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        // when uploading data in streaming mode, we need to regularly detect 
whether there are
-        // exceptions.
-        scheduledExecutorService.scheduleWithFixedDelay(
-                this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
+        // todo: When writing to multiple tables,
+        //  the checkdone thread may cause problems.
+        if (!multiTableLoad && intervalTime > 1000) {
+            // when uploading data in streaming mode, we need to regularly 
detect whether there are
+            // exceptions.
+            LOG.info("start stream load checkdone thread.");
+            scheduledExecutorService.scheduleWithFixedDelay(
+                    this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
+        }
     }
 
     private void abortLingeringTransactions(Collection<DorisWriterState> 
recoveredStates)
@@ -248,8 +254,13 @@ public class DorisWriter<IN>
             }
             if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                 if (executionOptions.enabled2PC()
-                        && 
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
-                    LOG.info("try to abort {} cause Label Already Exists", 
respContent.getLabel());
+                        && 
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())
+                        && 
!JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
+                    LOG.info(
+                            "try to abort {} cause status {}, exist job status 
{} ",
+                            respContent.getLabel(),
+                            respContent.getStatus(),
+                            respContent.getExistingJobStatus());
                     dorisStreamLoad.abortLabelExistTransaction(respContent);
                     throw new LabelAlreadyExistsException("Exist label abort 
finished, retry");
                 } else {
@@ -338,12 +349,8 @@ public class DorisWriter<IN>
 
     /** Check the streamload http request regularly. */
     private void checkDone() {
-        // todo: When writing to multiple tables,
-        //  the checkdone thread may cause problems. Disable it first.
-        if (!multiTableLoad) {
-            for (Map.Entry<String, DorisStreamLoad> streamLoadMap : 
dorisStreamLoadMap.entrySet()) {
-                checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
-            }
+        for (Map.Entry<String, DorisStreamLoad> streamLoadMap : 
dorisStreamLoadMap.entrySet()) {
+            checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 6d74bf8..63e550a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -195,8 +195,9 @@ public class DorisConfigOptions {
     public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
             ConfigOptions.key("sink.check-interval")
                     .durationType()
-                    .defaultValue(Duration.ofMillis(10000))
-                    .withDescription("check exception with the interval while 
loading");
+                    .defaultValue(Duration.ofMillis(0))
+                    .withDescription(
+                            "check exception with the interval while loading, 
The default is 0, disabling the checker thread");
     public static final ConfigOption<Integer> SINK_MAX_RETRIES =
             ConfigOptions.key("sink.max-retries")
                     .intType()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to