This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a8decd94 add multi table case (#577)
a8decd94 is described below

commit a8decd9443de8afaf2ff764ca86b13ab1f0cb252
Author: wudi <[email protected]>
AuthorDate: Mon Mar 17 10:07:54 2025 +0800

    add multi table case (#577)
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |   2 +-
 .../doris/flink/sink/writer/DorisWriter.java       | 138 +++++-----
 .../doris/flink/table/DorisConfigOptions.java      |   4 +-
 .../flink/container/AbstractITCaseService.java     |  76 +++++-
 .../doris/flink/sink/DorisSinkFailoverITCase.java  |  91 ++-----
 .../sink/DorisSinkMultiTblFailoverITCase.java      | 277 +++++++++++++++++++++
 .../{MockSource.java => MockMultiTableSource.java} |  64 +++--
 .../org/apache/doris/flink/utils/MockSource.java   |   3 +
 8 files changed, 507 insertions(+), 148 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 831a317e..371069f5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -34,7 +34,7 @@ public class DorisExecutionOptions implements Serializable {
 
     private static final long serialVersionUID = 1L;
     // 0 means disable checker thread
-    public static final int DEFAULT_CHECK_INTERVAL = 0;
+    public static final int DEFAULT_CHECK_INTERVAL = 10000;
     public static final int DEFAULT_MAX_RETRY_TIMES = 3;
     private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
     private static final int DEFAULT_BUFFER_COUNT = 3;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 8e28213d..466b995f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -51,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -86,6 +86,7 @@ public class DorisWriter<IN>
     private SinkWriterMetricGroup sinkMetricGroup;
     private Map<String, DorisWriteMetrics> sinkMetricsMap = new 
ConcurrentHashMap<>();
     private volatile boolean multiTableLoad = false;
+    private final ReentrantLock checkLock = new ReentrantLock();
 
     public DorisWriter(
             Sink.InitContext initContext,
@@ -104,7 +105,13 @@ public class DorisWriter<IN>
         this.labelPrefix = executionOptions.getLabelPrefix();
         this.subtaskId = initContext.getSubtaskId();
         this.scheduledExecutorService =
-                new ScheduledThreadPoolExecutor(1, new 
ExecutorThreadFactory("stream-load-check"));
+                new ScheduledThreadPoolExecutor(
+                        1,
+                        r -> {
+                            Thread t = new Thread(r, "stream-load-check-" + 
subtaskId);
+                            t.setPriority(Thread.MIN_PRIORITY);
+                            return t;
+                        });
         this.serializer = serializer;
         if (StringUtils.isBlank(dorisOptions.getTableIdentifier())) {
             this.multiTableLoad = true;
@@ -132,12 +139,10 @@ public class DorisWriter<IN>
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        // todo: When writing to multiple tables,
-        //  the checkdone thread may cause problems.
-        if (!multiTableLoad && intervalTime > 1000) {
+        if (intervalTime >= 1000) {
             // when uploading data in streaming mode, we need to regularly 
detect whether there are
             // exceptions.
-            LOG.info("start stream load checkdone thread.");
+            LOG.info("start stream load checkdone thread with interval {} ms", 
intervalTime);
             scheduledExecutorService.scheduleWithFixedDelay(
                     this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
         }
@@ -235,57 +240,61 @@ public class DorisWriter<IN>
         if (!globalLoading && 
loadingMap.values().stream().noneMatch(Boolean::booleanValue)) {
             return Collections.emptyList();
         }
-
         // disable exception checker before stop load.
         globalLoading = false;
-        // submit stream load http request
-        List<DorisCommittable> committableList = new ArrayList<>();
-        for (Map.Entry<String, DorisStreamLoad> streamLoader : 
dorisStreamLoadMap.entrySet()) {
-            String tableIdentifier = streamLoader.getKey();
-            if (!loadingMap.getOrDefault(tableIdentifier, false)) {
-                LOG.debug("skip table {}, no data need to load.", 
tableIdentifier);
-                continue;
-            }
-            DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
-            RespContent respContent = dorisStreamLoad.stopLoad();
-            // refresh metrics
-            if (sinkMetricsMap.containsKey(tableIdentifier)) {
-                DorisWriteMetrics dorisWriteMetrics = 
sinkMetricsMap.get(tableIdentifier);
-                dorisWriteMetrics.flush(respContent);
-            }
-            if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-                if (executionOptions.enabled2PC()
-                        && 
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())
-                        && 
!JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
-                    LOG.info(
-                            "try to abort {} cause status {}, exist job status 
{} ",
-                            respContent.getLabel(),
-                            respContent.getStatus(),
-                            respContent.getExistingJobStatus());
-                    dorisStreamLoad.abortLabelExistTransaction(respContent);
-                    throw new LabelAlreadyExistsException("Exist label abort 
finished, retry");
-                } else {
-                    String errMsg =
-                            String.format(
-                                    "table %s stream load error: %s, see more 
in %s",
-                                    tableIdentifier,
-                                    respContent.getMessage(),
-                                    respContent.getErrorURL());
-                    LOG.error("Failed to load, {}", errMsg);
-                    throw new DorisRuntimeException(errMsg);
+        checkLock.lockInterruptibly();
+        try {
+            // submit stream load http request
+            List<DorisCommittable> committableList = new ArrayList<>();
+            for (Map.Entry<String, DorisStreamLoad> streamLoader : 
dorisStreamLoadMap.entrySet()) {
+                String tableIdentifier = streamLoader.getKey();
+                if (!loadingMap.getOrDefault(tableIdentifier, false)) {
+                    LOG.debug("skip table {}, no data need to load.", 
tableIdentifier);
+                    continue;
+                }
+                DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
+                RespContent respContent = dorisStreamLoad.stopLoad();
+                // refresh metrics
+                if (sinkMetricsMap.containsKey(tableIdentifier)) {
+                    DorisWriteMetrics dorisWriteMetrics = 
sinkMetricsMap.get(tableIdentifier);
+                    dorisWriteMetrics.flush(respContent);
+                }
+                if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+                    if (executionOptions.enabled2PC()
+                            && 
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())
+                            && 
!JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
+                        LOG.info(
+                                "try to abort {} cause status {}, exist job 
status {} ",
+                                respContent.getLabel(),
+                                respContent.getStatus(),
+                                respContent.getExistingJobStatus());
+                        
dorisStreamLoad.abortLabelExistTransaction(respContent);
+                        throw new LabelAlreadyExistsException("Exist label 
abort finished, retry");
+                    } else {
+                        String errMsg =
+                                String.format(
+                                        "table %s stream load error: %s, see 
more in %s",
+                                        tableIdentifier,
+                                        respContent.getMessage(),
+                                        respContent.getErrorURL());
+                        LOG.error("Failed to load, {}", errMsg);
+                        throw new DorisRuntimeException(errMsg);
+                    }
+                }
+                if (executionOptions.enabled2PC()) {
+                    long txnId = respContent.getTxnId();
+                    committableList.add(
+                            new DorisCommittable(
+                                    dorisStreamLoad.getHostPort(), 
dorisStreamLoad.getDb(), txnId));
                 }
             }
-            if (executionOptions.enabled2PC()) {
-                long txnId = respContent.getTxnId();
-                committableList.add(
-                        new DorisCommittable(
-                                dorisStreamLoad.getHostPort(), 
dorisStreamLoad.getDb(), txnId));
-            }
-        }
 
-        // clean loadingMap
-        loadingMap.clear();
-        return committableList;
+            // clean loadingMap
+            loadingMap.clear();
+            return committableList;
+        } finally {
+            checkLock.unlock();
+        }
     }
 
     private void abortPossibleSuccessfulTransaction() {
@@ -351,19 +360,36 @@ public class DorisWriter<IN>
 
     /** Check the streamload http request regularly. */
     private void checkDone() {
-        for (Map.Entry<String, DorisStreamLoad> streamLoadMap : 
dorisStreamLoadMap.entrySet()) {
-            checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
+        if (!globalLoading) {
+            return;
+        }
+        LOG.debug("start timer checker, interval {} ms", intervalTime);
+        if (checkLock.tryLock()) {
+            try {
+                // double check
+                if (!globalLoading) {
+                    return;
+                }
+                for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
+                        dorisStreamLoadMap.entrySet()) {
+                    checkAllDone(streamLoadMap.getKey(), 
streamLoadMap.getValue());
+                }
+            } finally {
+                checkLock.unlock();
+            }
         }
     }
 
     private void checkAllDone(String tableIdentifier, DorisStreamLoad 
dorisStreamLoad) {
         // the load future is done and checked in prepareCommit().
         // this will check error while loading.
-        LOG.debug("start timer checker, interval {} ms", intervalTime);
         if (dorisStreamLoad.getPendingLoadFuture() != null
                 && dorisStreamLoad.getPendingLoadFuture().isDone()) {
             if (!globalLoading || !loadingMap.get(tableIdentifier)) {
-                LOG.debug("not loading, skip timer checker for table {}", 
tableIdentifier);
+                LOG.debug(
+                        "not loading, skip timer checker for table {}, {}",
+                        tableIdentifier,
+                        globalLoading);
                 return;
             }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index b0cb2316..b99710e3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -216,9 +216,9 @@ public class DorisConfigOptions {
     public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
             ConfigOptions.key("sink.check-interval")
                     .durationType()
-                    .defaultValue(Duration.ofMillis(0))
+                    .defaultValue(Duration.ofMillis(10000))
                     .withDescription(
-                            "check exception with the interval while loading, 
The default is 0, disabling the checker thread");
+                            "check exception with the interval while loading, 
The default is 1s, 0 means disabling the checker thread");
     public static final ConfigOption<Integer> SINK_MAX_RETRIES =
             ConfigOptions.key("sink.max-retries")
                     .intType()
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
index e6b8e163..e3d709f5 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
@@ -25,9 +25,18 @@ import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipCont
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.util.function.SupplierWithException;
 
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -39,7 +48,14 @@ public abstract class AbstractITCaseService extends 
AbstractContainerTestBase {
             JobClient client, List<JobStatus> expectedStatus, Deadline 
deadline) throws Exception {
         waitUntilCondition(
                 () -> {
-                    JobStatus currentStatus = (JobStatus) 
client.getJobStatus().get();
+                    JobStatus currentStatus;
+                    try {
+                        currentStatus = (JobStatus) 
client.getJobStatus().get();
+                    } catch (IllegalStateException e) {
+                        LOG.warn("Failed to get state, cause " + 
e.getMessage());
+                        currentStatus = JobStatus.FINISHED;
+                    }
+
                     if (expectedStatus.contains(currentStatus)) {
                         return true;
                     } else if (currentStatus.isTerminalState()) {
@@ -138,11 +154,67 @@ public abstract class AbstractITCaseService extends 
AbstractContainerTestBase {
         try {
             jobStatus = jobClient.getJobStatus().get();
         } catch (IllegalStateException e) {
-            LOG.info("Failed to get state, cause " + e.getMessage());
+            LOG.warn("Failed to get state, cause " + e.getMessage());
             jobStatus = JobStatus.FINISHED;
         } catch (InterruptedException | ExecutionException e) {
             throw new RuntimeException(e);
         }
         return jobStatus;
     }
+
+    protected void faultInjectionOpen() throws IOException {
+        String pointName = "FlushToken.submit_flush_error";
+        String apiUrl =
+                String.format(
+                        "http://%s/api/debug_point/add/%s";,
+                        dorisContainerService.getBenodes(), pointName);
+        HttpPost httpPost = new HttpPost(apiUrl);
+        httpPost.addHeader(
+                HttpHeaders.AUTHORIZATION,
+                auth(dorisContainerService.getUsername(), 
dorisContainerService.getPassword()));
+        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                String reason = response.getStatusLine().toString();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
+                } else {
+                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
+                }
+            }
+        }
+    }
+
+    protected void faultInjectionClear() throws IOException {
+        String apiUrl =
+                String.format(
+                        "http://%s/api/debug_point/clear";, 
dorisContainerService.getBenodes());
+        HttpPost httpPost = new HttpPost(apiUrl);
+        httpPost.addHeader(
+                HttpHeaders.AUTHORIZATION,
+                auth(dorisContainerService.getUsername(), 
dorisContainerService.getPassword()));
+        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                String reason = response.getStatusLine().toString();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
+                } else {
+                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
+                }
+            }
+        }
+    }
+
+    protected String auth(String user, String password) {
+        final String authInfo = user + ":" + password;
+        byte[] encoded = 
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encoded);
+    }
+
+    protected enum FaultType {
+        RESTART_FAILURE,
+        STREAM_LOAD_FAILURE,
+        CHECKPOINT_FAILURE
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
index 95156d38..72811803 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -30,12 +29,6 @@ import 
org.apache.doris.flink.container.AbstractITCaseService;
 import org.apache.doris.flink.container.ContainerUtils;
 import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.doris.flink.utils.MockSource;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,8 +36,6 @@ import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -109,6 +100,7 @@ public class DorisSinkFailoverITCase extends 
AbstractITCaseService {
         executionBuilder
                 .setLabelPrefix(UUID.randomUUID().toString())
                 .enable2PC()
+                .setCheckInterval(1000)
                 .setBatchMode(batchMode)
                 .setFlushQueueSize(4)
                 .setStreamLoadProp(properties);
@@ -140,7 +132,15 @@ public class DorisSinkFailoverITCase extends 
AbstractITCaseService {
         int maxRestart = 5;
         Random random = new Random();
         while (true) {
-            result = 
ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, query, 2);
+            try {
+                // restart may be make query failed
+                result =
+                        ContainerUtils.executeSQLStatement(
+                                getDorisQueryConnection(), LOG, query, 2);
+            } catch (Exception ex) {
+                LOG.error("Failed to query result, cause " + ex.getMessage());
+                continue;
+            }
 
             if (result.size() >= totalRecords * DEFAULT_PARALLELISM
                     && 
getFlinkJobStatus(jobClient).equals(JobStatus.FINISHED)) {
@@ -151,21 +151,21 @@ public class DorisSinkFailoverITCase extends 
AbstractITCaseService {
             // Wait until write is successful, then trigger error
             if (result.size() > 1 && maxRestart-- >= 0) {
                 // trigger error random
-                int randomSleepMs = random.nextInt(30);
+                int randomSleepSec = random.nextInt(30);
                 if (FaultType.STREAM_LOAD_FAILURE.equals(faultType)) {
                     faultInjectionOpen();
-                    randomSleepMs = randomSleepMs + 20;
-                    LOG.info("Injecting fault, sleep {}s before recover", 
randomSleepMs);
-                    Thread.sleep(randomSleepMs * 1000);
+                    randomSleepSec = randomSleepSec + 20;
+                    LOG.info("Injecting fault, sleep {}s before recover", 
randomSleepSec);
+                    Thread.sleep(randomSleepSec * 1000);
                     faultInjectionClear();
                 } else if (FaultType.RESTART_FAILURE.equals(faultType)) {
                     // docker image restart time is about 60s
-                    randomSleepMs = randomSleepMs + 60;
+                    randomSleepSec = randomSleepSec + 60;
                     dorisContainerService.restartContainer();
                     LOG.info(
                             "Restarting doris cluster, sleep {}s before next 
restart",
-                            randomSleepMs);
-                    Thread.sleep(randomSleepMs * 1000);
+                            randomSleepSec);
+                    Thread.sleep(randomSleepSec * 1000);
                 }
             } else {
                 // Avoid frequent queries
@@ -185,61 +185,12 @@ public class DorisSinkFailoverITCase extends 
AbstractITCaseService {
         } else {
             List<String> actualResult =
                     ContainerUtils.getResult(getDorisQueryConnection(), LOG, 
expected, query, 2);
+            LOG.info("actual size: {}, expected size: {}", 
actualResult.size(), expected.size());
             Assert.assertTrue(
                     actualResult.size() >= expected.size() && 
actualResult.containsAll(expected));
         }
     }
 
-    public void faultInjectionOpen() throws IOException {
-        String pointName = "FlushToken.submit_flush_error";
-        String apiUrl =
-                String.format(
-                        "http://%s/api/debug_point/add/%s";,
-                        dorisContainerService.getBenodes(), pointName);
-        HttpPost httpPost = new HttpPost(apiUrl);
-        httpPost.addHeader(
-                HttpHeaders.AUTHORIZATION,
-                auth(dorisContainerService.getUsername(), 
dorisContainerService.getPassword()));
-        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
-            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
-                int statusCode = response.getStatusLine().getStatusCode();
-                String reason = response.getStatusLine().toString();
-                if (statusCode == 200 && response.getEntity() != null) {
-                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
-                } else {
-                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
-                }
-            }
-        }
-    }
-
-    public void faultInjectionClear() throws IOException {
-        String apiUrl =
-                String.format(
-                        "http://%s/api/debug_point/clear";, 
dorisContainerService.getBenodes());
-        HttpPost httpPost = new HttpPost(apiUrl);
-        httpPost.addHeader(
-                HttpHeaders.AUTHORIZATION,
-                auth(dorisContainerService.getUsername(), 
dorisContainerService.getPassword()));
-        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
-            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
-                int statusCode = response.getStatusLine().getStatusCode();
-                String reason = response.getStatusLine().toString();
-                if (statusCode == 200 && response.getEntity() != null) {
-                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
-                } else {
-                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
-                }
-            }
-        }
-    }
-
-    private String auth(String user, String password) {
-        final String authInfo = user + ":" + password;
-        byte[] encoded = 
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
-        return "Basic " + new String(encoded);
-    }
-
     private void initializeTable(String table) {
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
@@ -256,10 +207,4 @@ public class DorisSinkFailoverITCase extends 
AbstractITCaseService {
                                 + ")\n",
                         DATABASE, table));
     }
-
-    enum FaultType {
-        RESTART_FAILURE,
-        STREAM_LOAD_FAILURE,
-        CHECKPOINT_FAILURE
-    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
new file mode 100644
index 00000000..d5dd6927
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer;
+import org.apache.doris.flink.utils.MockMultiTableSource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+
+/** DorisSink abnormal case of multi-table writing */
+@RunWith(Parameterized.class)
+public class DorisSinkMultiTblFailoverITCase extends AbstractITCaseService {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DorisSinkMultiTblFailoverITCase.class);
+    static final String DATABASE = "test_multi_failover_sink";
+    static final String TABLE_MULTI_CSV = "tbl_multi_csv";
+    private final boolean batchMode;
+
+    public DorisSinkMultiTblFailoverITCase(boolean batchMode) {
+        this.batchMode = batchMode;
+    }
+
+    @Parameterized.Parameters(name = "batchMode: {0}")
+    public static Object[] parameters() {
+        return new Object[][] {new Object[] {false}, new Object[] {true}};
+    }
+
+    /**
+     * Four exceptions are simulated in one job 1. Add a table that does not 
exist 2. flink
+     * checkpoint failed 3. doris cluster restart 4. stream load fail
+     */
+    @Test
+    public void testDorisClusterFailoverSink() throws Exception {
+        int totalTblNum = 3;
+        for (int i = 1; i <= totalTblNum; i++) {
+            String tableName = TABLE_MULTI_CSV + i;
+            initializeTable(tableName);
+        }
+        int newTableIndex = totalTblNum + 1;
+        dropTable(TABLE_MULTI_CSV + newTableIndex);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        int checkpointIntervalMs = 5000;
+        env.enableCheckpointing(checkpointIntervalMs);
+
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("format", "csv");
+        DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
+        DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
+        executionBuilder
+                .setLabelPrefix(UUID.randomUUID().toString())
+                .enable2PC()
+                .setBatchMode(batchMode)
+                .setFlushQueueSize(4)
+                .setBufferSize(1024)
+                .setCheckInterval(1000)
+                .setStreamLoadProp(properties);
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder
+                .setFenodes(getFenodes())
+                .setTableIdentifier("")
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
+
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(new RecordWithMetaSerializer())
+                .setDorisOptions(dorisBuilder.build());
+
+        int triggerCkptError = 7;
+        int totalRecords = 20;
+        int addTblCheckpointId = 4;
+        DataStreamSource<RecordWithMeta> mockSource =
+                env.addSource(
+                        new MockMultiTableSource(
+                                totalRecords,
+                                triggerCkptError,
+                                DATABASE,
+                                TABLE_MULTI_CSV,
+                                totalTblNum,
+                                addTblCheckpointId));
+
+        mockSource.sinkTo(builder.build());
+        JobClient jobClient = env.executeAsync();
+        CompletableFuture<JobStatus> jobStatus = jobClient.getJobStatus();
+        LOG.info("Job status: {}", jobStatus);
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(120)));
+
+        // wait checkpoint failure
+        List<JobStatus> errorStatus =
+                Arrays.asList(
+                        JobStatus.FAILING,
+                        JobStatus.CANCELLING,
+                        JobStatus.CANCELED,
+                        JobStatus.FAILED,
+                        JobStatus.RESTARTING);
+
+        waitForJobStatus(jobClient, errorStatus, 
Deadline.fromNow(Duration.ofSeconds(300)));
+
+        Random random = new Random();
+        LOG.info("start to create add table");
+        // random sleep to create table
+        Thread.sleep((checkpointIntervalMs / 1000 * random.nextInt(3)) * 1000);
+        // create new add table
+        initializeTable(TABLE_MULTI_CSV + newTableIndex);
+
+        LOG.info("wait job restart success");
+        // wait table restart success
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(300)));
+
+        LOG.info("wait job running finished");
+
+        List<String> result = new ArrayList<>();
+        boolean restart = false;
+        boolean faultInjection = false;
+        while (true) {
+            try {
+                // restart may be make query failed
+                String query =
+                        String.format(
+                                "select * from %s",
+                                DATABASE + "." + TABLE_MULTI_CSV + 
newTableIndex);
+                result =
+                        ContainerUtils.executeSQLStatement(
+                                getDorisQueryConnection(), LOG, query, 2);
+            } catch (Exception ex) {
+                LOG.error("Failed to query result, cause " + ex.getMessage());
+                continue;
+            }
+
+            if (getFlinkJobStatus(jobClient).equals(JobStatus.FINISHED)) {
+                // Batch mode can only achieve at least once
+                break;
+            }
+
+            int randomSleepSec = random.nextInt(10);
+            if (result.size() > 2 && !faultInjection) {
+                faultInjectionOpen();
+                randomSleepSec = randomSleepSec + checkpointIntervalMs / 1000;
+                LOG.info("Injecting fault, sleep {}s before recover", 
randomSleepSec);
+                Thread.sleep(randomSleepSec * 1000);
+                faultInjectionClear();
+                LOG.info("Injecting fault recover");
+                faultInjection = true;
+            }
+
+            if (result.size() > 6 && !restart) {
+                LOG.info(
+                        "Restarting doris cluster, sleep {}s before next 
restart",
+                        randomSleepSec + 30);
+                dorisContainerService.restartContainer();
+                Thread.sleep((randomSleepSec + 30) * 1000);
+                restart = true;
+            }
+            Thread.sleep(500);
+        }
+
+        // concat expect value
+        List<String> expected = new ArrayList<>();
+        for (int tb = 1; tb <= newTableIndex; tb++) {
+            for (int i = 1; i <= totalRecords; i++) {
+                if (tb == newTableIndex && i <= addTblCheckpointId) {
+                    continue;
+                }
+                for (int j = 0; j < DEFAULT_PARALLELISM; j++) {
+                    expected.add(TABLE_MULTI_CSV + tb + "," + i + "," + j);
+                }
+            }
+        }
+
+        String queryRes =
+                String.format(
+                        "select * from (select \"%s\" as tbl,id,task_id from 
%s.%s "
+                                + "union all select \"%s\" as tbl,id,task_id 
from %s.%s "
+                                + "union all select \"%s\" as tbl,id,task_id 
from %s.%s "
+                                + "union all select \"%s\" as tbl,id,task_id 
from %s.%s ) a"
+                                + " order by tbl,id,task_id",
+                        TABLE_MULTI_CSV + 1,
+                        DATABASE,
+                        TABLE_MULTI_CSV + 1,
+                        TABLE_MULTI_CSV + 2,
+                        DATABASE,
+                        TABLE_MULTI_CSV + 2,
+                        TABLE_MULTI_CSV + 3,
+                        DATABASE,
+                        TABLE_MULTI_CSV + 3,
+                        TABLE_MULTI_CSV + 4,
+                        DATABASE,
+                        TABLE_MULTI_CSV + 4);
+
+        if (!batchMode) {
+            ContainerUtils.checkResult(
+                    getDorisQueryConnection(), LOG, expected, queryRes, 3, 
false);
+        } else {
+            List<String> actualResult =
+                    ContainerUtils.getResult(getDorisQueryConnection(), LOG, 
expected, queryRes, 3);
+            LOG.info("actual size: {}, expected size: {}", 
actualResult.size(), expected.size());
+            Assert.assertTrue(
+                    actualResult.size() >= expected.size() && 
actualResult.containsAll(expected));
+        }
+    }
+
+    private void initializeTable(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`id` int,\n"
+                                + "`task_id` int\n"
+                                + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, table));
+    }
+
+    private void dropTable(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockMultiTableSource.java
similarity index 64%
copy from 
flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
copy to 
flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockMultiTableSource.java
index fbbc0863..287275ea 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockMultiTableSource.java
@@ -27,40 +27,59 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.util.SerializableObject;
 
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.Iterator;
 
-public class MockSource extends RichParallelSourceFunction<String>
+/*
+ * MockMultiTableSource is a mock source for testing multi-table source
+ * */
+public class MockMultiTableSource extends 
RichParallelSourceFunction<RecordWithMeta>
         implements CheckpointedFunction, CheckpointListener {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MockSource.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(MockMultiTableSource.class);
     private final Object blocker = new SerializableObject();
     private transient ListState<Long> state;
+    private transient ListState<Integer> tableNumState;
     private Long id = 0L;
     private int numEventsTotal;
     private int failCheckpointId = -1;
     private volatile boolean running = true;
     private volatile long waitNextCheckpoint = 0L;
     private volatile long lastCheckpointConfirmed = 0L;
+    private String database;
+    private String tableName;
+    private int tableNum;
+    private int addTableCheckpointId;
 
-    public MockSource(int numEventsTotal) {
-        this.numEventsTotal = numEventsTotal;
-    }
-
-    public MockSource(int numEventsTotal, int failCheckpointId) {
+    public MockMultiTableSource(
+            int numEventsTotal,
+            int failCheckpointId,
+            String database,
+            String tableName,
+            int tableNum,
+            int addTableCheckpointId) {
         this.numEventsTotal = numEventsTotal;
         this.failCheckpointId = failCheckpointId;
+        this.database = database;
+        this.tableName = tableName;
+        this.tableNum = tableNum;
+        this.addTableCheckpointId = addTableCheckpointId;
     }
 
     @Override
-    public void run(SourceContext<String> ctx) throws Exception {
-
+    public void run(SourceContext<RecordWithMeta> ctx) throws Exception {
         int taskId = getRuntimeContext().getIndexOfThisSubtask();
         while (this.running && id < this.numEventsTotal) {
-            String record = ++id + "," + taskId;
-            ctx.collect(record);
+            id = id + 1;
+            for (int i = 1; i <= tableNum; i++) {
+                String record = id + "," + taskId;
+                RecordWithMeta output = new RecordWithMeta(database, tableName 
+ i, record);
+                ctx.collect(output);
+            }
+
             // Wait for the checkpoint to complete before sending the next 
record
             waitNextCheckpoint = lastCheckpointConfirmed + 1;
             synchronized (this.blocker) {
@@ -79,11 +98,19 @@ public class MockSource extends 
RichParallelSourceFunction<String>
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
         state.update(Collections.singletonList(id));
-        if (failCheckpointId > 0 && context.getCheckpointId() % 
failCheckpointId == 0) {
+        if (context.getCheckpointId() == addTableCheckpointId) {
+            tableNum++;
+        }
+        tableNumState.update(Collections.singletonList(tableNum));
+        if (failCheckpointId > 0 && context.getCheckpointId() == 
failCheckpointId) {
             throw new RuntimeException(
                     "Trigger fail for testing, checkpointId = " + 
context.getCheckpointId());
         }
-        LOG.info("snapshot state to {} for checkpoint {}", id, 
context.getCheckpointId());
+        LOG.info(
+                "snapshot state to id={}, tableNum={} for checkpoint {}",
+                id,
+                tableNum,
+                context.getCheckpointId());
     }
 
     @Override
@@ -92,13 +119,22 @@ public class MockSource extends 
RichParallelSourceFunction<String>
                 context.getOperatorStateStore()
                         .getListState(
                                 new ListStateDescriptor<>("id", 
TypeInformation.of(Long.class)));
+        tableNumState =
+                context.getOperatorStateStore()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "tableNum", 
TypeInformation.of(Integer.class)));
         if (context.isRestored()) {
             Iterator<Long> iterator = state.get().iterator();
             while (iterator.hasNext()) {
                 id += iterator.next();
             }
+            Iterator<Integer> tableNumIterator = 
tableNumState.get().iterator();
+            while (tableNumIterator.hasNext()) {
+                tableNum = tableNumIterator.next();
+            }
         }
-        LOG.info("restore state from {}", id);
+        LOG.info("restore state from id {}, tableNum {}", id, tableNum);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
index fbbc0863..b7e83196 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
@@ -33,6 +33,9 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.Iterator;
 
+/*
+ * MockSource is a mock source for testing
+ * */
 public class MockSource extends RichParallelSourceFunction<String>
         implements CheckpointedFunction, CheckpointListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(MockSource.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to