This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a8decd94 add multi table case (#577)
a8decd94 is described below
commit a8decd9443de8afaf2ff764ca86b13ab1f0cb252
Author: wudi <[email protected]>
AuthorDate: Mon Mar 17 10:07:54 2025 +0800
add multi table case (#577)
---
.../doris/flink/cfg/DorisExecutionOptions.java | 2 +-
.../doris/flink/sink/writer/DorisWriter.java | 138 +++++-----
.../doris/flink/table/DorisConfigOptions.java | 4 +-
.../flink/container/AbstractITCaseService.java | 76 +++++-
.../doris/flink/sink/DorisSinkFailoverITCase.java | 91 ++-----
.../sink/DorisSinkMultiTblFailoverITCase.java | 277 +++++++++++++++++++++
.../{MockSource.java => MockMultiTableSource.java} | 64 +++--
.../org/apache/doris/flink/utils/MockSource.java | 3 +
8 files changed, 507 insertions(+), 148 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 831a317e..371069f5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -34,7 +34,7 @@ public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;
// 0 means disable checker thread
- public static final int DEFAULT_CHECK_INTERVAL = 0;
+ public static final int DEFAULT_CHECK_INTERVAL = 10000;
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
private static final int DEFAULT_BUFFER_COUNT = 3;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 8e28213d..466b995f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -51,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -86,6 +86,7 @@ public class DorisWriter<IN>
private SinkWriterMetricGroup sinkMetricGroup;
private Map<String, DorisWriteMetrics> sinkMetricsMap = new
ConcurrentHashMap<>();
private volatile boolean multiTableLoad = false;
+ private final ReentrantLock checkLock = new ReentrantLock();
public DorisWriter(
Sink.InitContext initContext,
@@ -104,7 +105,13 @@ public class DorisWriter<IN>
this.labelPrefix = executionOptions.getLabelPrefix();
this.subtaskId = initContext.getSubtaskId();
this.scheduledExecutorService =
- new ScheduledThreadPoolExecutor(1, new
ExecutorThreadFactory("stream-load-check"));
+ new ScheduledThreadPoolExecutor(
+ 1,
+ r -> {
+ Thread t = new Thread(r, "stream-load-check-" +
subtaskId);
+ t.setPriority(Thread.MIN_PRIORITY);
+ return t;
+ });
this.serializer = serializer;
if (StringUtils.isBlank(dorisOptions.getTableIdentifier())) {
this.multiTableLoad = true;
@@ -132,12 +139,10 @@ public class DorisWriter<IN>
}
// get main work thread.
executorThread = Thread.currentThread();
- // todo: When writing to multiple tables,
- // the checkdone thread may cause problems.
- if (!multiTableLoad && intervalTime > 1000) {
+ if (intervalTime >= 1000) {
// when uploading data in streaming mode, we need to regularly
detect whether there are
// exceptions.
- LOG.info("start stream load checkdone thread.");
+ LOG.info("start stream load checkdone thread with interval {} ms",
intervalTime);
scheduledExecutorService.scheduleWithFixedDelay(
this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
}
@@ -235,57 +240,61 @@ public class DorisWriter<IN>
if (!globalLoading &&
loadingMap.values().stream().noneMatch(Boolean::booleanValue)) {
return Collections.emptyList();
}
-
// disable exception checker before stop load.
globalLoading = false;
- // submit stream load http request
- List<DorisCommittable> committableList = new ArrayList<>();
- for (Map.Entry<String, DorisStreamLoad> streamLoader :
dorisStreamLoadMap.entrySet()) {
- String tableIdentifier = streamLoader.getKey();
- if (!loadingMap.getOrDefault(tableIdentifier, false)) {
- LOG.debug("skip table {}, no data need to load.",
tableIdentifier);
- continue;
- }
- DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
- RespContent respContent = dorisStreamLoad.stopLoad();
- // refresh metrics
- if (sinkMetricsMap.containsKey(tableIdentifier)) {
- DorisWriteMetrics dorisWriteMetrics =
sinkMetricsMap.get(tableIdentifier);
- dorisWriteMetrics.flush(respContent);
- }
- if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
- if (executionOptions.enabled2PC()
- &&
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())
- &&
!JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
- LOG.info(
- "try to abort {} cause status {}, exist job status
{} ",
- respContent.getLabel(),
- respContent.getStatus(),
- respContent.getExistingJobStatus());
- dorisStreamLoad.abortLabelExistTransaction(respContent);
- throw new LabelAlreadyExistsException("Exist label abort
finished, retry");
- } else {
- String errMsg =
- String.format(
- "table %s stream load error: %s, see more
in %s",
- tableIdentifier,
- respContent.getMessage(),
- respContent.getErrorURL());
- LOG.error("Failed to load, {}", errMsg);
- throw new DorisRuntimeException(errMsg);
+ checkLock.lockInterruptibly();
+ try {
+ // submit stream load http request
+ List<DorisCommittable> committableList = new ArrayList<>();
+ for (Map.Entry<String, DorisStreamLoad> streamLoader :
dorisStreamLoadMap.entrySet()) {
+ String tableIdentifier = streamLoader.getKey();
+ if (!loadingMap.getOrDefault(tableIdentifier, false)) {
+ LOG.debug("skip table {}, no data need to load.",
tableIdentifier);
+ continue;
+ }
+ DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
+ RespContent respContent = dorisStreamLoad.stopLoad();
+ // refresh metrics
+ if (sinkMetricsMap.containsKey(tableIdentifier)) {
+ DorisWriteMetrics dorisWriteMetrics =
sinkMetricsMap.get(tableIdentifier);
+ dorisWriteMetrics.flush(respContent);
+ }
+ if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+ if (executionOptions.enabled2PC()
+ &&
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())
+ &&
!JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
+ LOG.info(
+ "try to abort {} cause status {}, exist job
status {} ",
+ respContent.getLabel(),
+ respContent.getStatus(),
+ respContent.getExistingJobStatus());
+
dorisStreamLoad.abortLabelExistTransaction(respContent);
+ throw new LabelAlreadyExistsException("Exist label
abort finished, retry");
+ } else {
+ String errMsg =
+ String.format(
+ "table %s stream load error: %s, see
more in %s",
+ tableIdentifier,
+ respContent.getMessage(),
+ respContent.getErrorURL());
+ LOG.error("Failed to load, {}", errMsg);
+ throw new DorisRuntimeException(errMsg);
+ }
+ }
+ if (executionOptions.enabled2PC()) {
+ long txnId = respContent.getTxnId();
+ committableList.add(
+ new DorisCommittable(
+ dorisStreamLoad.getHostPort(),
dorisStreamLoad.getDb(), txnId));
}
}
- if (executionOptions.enabled2PC()) {
- long txnId = respContent.getTxnId();
- committableList.add(
- new DorisCommittable(
- dorisStreamLoad.getHostPort(),
dorisStreamLoad.getDb(), txnId));
- }
- }
- // clean loadingMap
- loadingMap.clear();
- return committableList;
+ // clean loadingMap
+ loadingMap.clear();
+ return committableList;
+ } finally {
+ checkLock.unlock();
+ }
}
private void abortPossibleSuccessfulTransaction() {
@@ -351,19 +360,36 @@ public class DorisWriter<IN>
/** Check the streamload http request regularly. */
private void checkDone() {
- for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
dorisStreamLoadMap.entrySet()) {
- checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
+ if (!globalLoading) {
+ return;
+ }
+ LOG.debug("start timer checker, interval {} ms", intervalTime);
+ if (checkLock.tryLock()) {
+ try {
+ // double check
+ if (!globalLoading) {
+ return;
+ }
+ for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
+ dorisStreamLoadMap.entrySet()) {
+ checkAllDone(streamLoadMap.getKey(),
streamLoadMap.getValue());
+ }
+ } finally {
+ checkLock.unlock();
+ }
}
}
private void checkAllDone(String tableIdentifier, DorisStreamLoad
dorisStreamLoad) {
// the load future is done and checked in prepareCommit().
// this will check error while loading.
- LOG.debug("start timer checker, interval {} ms", intervalTime);
if (dorisStreamLoad.getPendingLoadFuture() != null
&& dorisStreamLoad.getPendingLoadFuture().isDone()) {
if (!globalLoading || !loadingMap.get(tableIdentifier)) {
- LOG.debug("not loading, skip timer checker for table {}",
tableIdentifier);
+ LOG.debug(
+ "not loading, skip timer checker for table {}, {}",
+ tableIdentifier,
+ globalLoading);
return;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index b0cb2316..b99710e3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -216,9 +216,9 @@ public class DorisConfigOptions {
public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
ConfigOptions.key("sink.check-interval")
.durationType()
- .defaultValue(Duration.ofMillis(0))
+ .defaultValue(Duration.ofMillis(10000))
.withDescription(
- "check exception with the interval while loading,
The default is 0, disabling the checker thread");
+ "check exception with the interval while loading,
The default is 1s, 0 means disabling the checker thread");
public static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
index e6b8e163..e3d709f5 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
@@ -25,9 +25,18 @@ import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipCont
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.util.function.SupplierWithException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -39,7 +48,14 @@ public abstract class AbstractITCaseService extends
AbstractContainerTestBase {
JobClient client, List<JobStatus> expectedStatus, Deadline
deadline) throws Exception {
waitUntilCondition(
() -> {
- JobStatus currentStatus = (JobStatus)
client.getJobStatus().get();
+ JobStatus currentStatus;
+ try {
+ currentStatus = (JobStatus)
client.getJobStatus().get();
+ } catch (IllegalStateException e) {
+ LOG.warn("Failed to get state, cause " +
e.getMessage());
+ currentStatus = JobStatus.FINISHED;
+ }
+
if (expectedStatus.contains(currentStatus)) {
return true;
} else if (currentStatus.isTerminalState()) {
@@ -138,11 +154,67 @@ public abstract class AbstractITCaseService extends
AbstractContainerTestBase {
try {
jobStatus = jobClient.getJobStatus().get();
} catch (IllegalStateException e) {
- LOG.info("Failed to get state, cause " + e.getMessage());
+ LOG.warn("Failed to get state, cause " + e.getMessage());
jobStatus = JobStatus.FINISHED;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return jobStatus;
}
+
+ protected void faultInjectionOpen() throws IOException {
+ String pointName = "FlushToken.submit_flush_error";
+ String apiUrl =
+ String.format(
+ "http://%s/api/debug_point/add/%s",
+ dorisContainerService.getBenodes(), pointName);
+ HttpPost httpPost = new HttpPost(apiUrl);
+ httpPost.addHeader(
+ HttpHeaders.AUTHORIZATION,
+ auth(dorisContainerService.getUsername(),
dorisContainerService.getPassword()));
+ try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+ try (CloseableHttpResponse response =
httpClient.execute(httpPost)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ String reason = response.getStatusLine().toString();
+ if (statusCode == 200 && response.getEntity() != null) {
+ LOG.info("Debug point response {}",
EntityUtils.toString(response.getEntity()));
+ } else {
+ LOG.info("Debug point failed, statusCode: {}, reason: {}",
statusCode, reason);
+ }
+ }
+ }
+ }
+
+ protected void faultInjectionClear() throws IOException {
+ String apiUrl =
+ String.format(
+ "http://%s/api/debug_point/clear",
dorisContainerService.getBenodes());
+ HttpPost httpPost = new HttpPost(apiUrl);
+ httpPost.addHeader(
+ HttpHeaders.AUTHORIZATION,
+ auth(dorisContainerService.getUsername(),
dorisContainerService.getPassword()));
+ try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+ try (CloseableHttpResponse response =
httpClient.execute(httpPost)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ String reason = response.getStatusLine().toString();
+ if (statusCode == 200 && response.getEntity() != null) {
+ LOG.info("Debug point response {}",
EntityUtils.toString(response.getEntity()));
+ } else {
+ LOG.info("Debug point failed, statusCode: {}, reason: {}",
statusCode, reason);
+ }
+ }
+ }
+ }
+
+ protected String auth(String user, String password) {
+ final String authInfo = user + ":" + password;
+ byte[] encoded =
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+ return "Basic " + new String(encoded);
+ }
+
+ protected enum FaultType {
+ RESTART_FAILURE,
+ STREAM_LOAD_FAILURE,
+ CHECKPOINT_FAILURE
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
index 95156d38..72811803 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.commons.codec.binary.Base64;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -30,12 +29,6 @@ import
org.apache.doris.flink.container.AbstractITCaseService;
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.doris.flink.utils.MockSource;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -43,8 +36,6 @@ import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -109,6 +100,7 @@ public class DorisSinkFailoverITCase extends
AbstractITCaseService {
executionBuilder
.setLabelPrefix(UUID.randomUUID().toString())
.enable2PC()
+ .setCheckInterval(1000)
.setBatchMode(batchMode)
.setFlushQueueSize(4)
.setStreamLoadProp(properties);
@@ -140,7 +132,15 @@ public class DorisSinkFailoverITCase extends
AbstractITCaseService {
int maxRestart = 5;
Random random = new Random();
while (true) {
- result =
ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, query, 2);
+ try {
+ // restart may be make query failed
+ result =
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(), LOG, query, 2);
+ } catch (Exception ex) {
+ LOG.error("Failed to query result, cause " + ex.getMessage());
+ continue;
+ }
if (result.size() >= totalRecords * DEFAULT_PARALLELISM
&&
getFlinkJobStatus(jobClient).equals(JobStatus.FINISHED)) {
@@ -151,21 +151,21 @@ public class DorisSinkFailoverITCase extends
AbstractITCaseService {
// Wait until write is successful, then trigger error
if (result.size() > 1 && maxRestart-- >= 0) {
// trigger error random
- int randomSleepMs = random.nextInt(30);
+ int randomSleepSec = random.nextInt(30);
if (FaultType.STREAM_LOAD_FAILURE.equals(faultType)) {
faultInjectionOpen();
- randomSleepMs = randomSleepMs + 20;
- LOG.info("Injecting fault, sleep {}s before recover",
randomSleepMs);
- Thread.sleep(randomSleepMs * 1000);
+ randomSleepSec = randomSleepSec + 20;
+ LOG.info("Injecting fault, sleep {}s before recover",
randomSleepSec);
+ Thread.sleep(randomSleepSec * 1000);
faultInjectionClear();
} else if (FaultType.RESTART_FAILURE.equals(faultType)) {
// docker image restart time is about 60s
- randomSleepMs = randomSleepMs + 60;
+ randomSleepSec = randomSleepSec + 60;
dorisContainerService.restartContainer();
LOG.info(
"Restarting doris cluster, sleep {}s before next
restart",
- randomSleepMs);
- Thread.sleep(randomSleepMs * 1000);
+ randomSleepSec);
+ Thread.sleep(randomSleepSec * 1000);
}
} else {
// Avoid frequent queries
@@ -185,61 +185,12 @@ public class DorisSinkFailoverITCase extends
AbstractITCaseService {
} else {
List<String> actualResult =
ContainerUtils.getResult(getDorisQueryConnection(), LOG,
expected, query, 2);
+ LOG.info("actual size: {}, expected size: {}",
actualResult.size(), expected.size());
Assert.assertTrue(
actualResult.size() >= expected.size() &&
actualResult.containsAll(expected));
}
}
- public void faultInjectionOpen() throws IOException {
- String pointName = "FlushToken.submit_flush_error";
- String apiUrl =
- String.format(
- "http://%s/api/debug_point/add/%s",
- dorisContainerService.getBenodes(), pointName);
- HttpPost httpPost = new HttpPost(apiUrl);
- httpPost.addHeader(
- HttpHeaders.AUTHORIZATION,
- auth(dorisContainerService.getUsername(),
dorisContainerService.getPassword()));
- try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
- try (CloseableHttpResponse response =
httpClient.execute(httpPost)) {
- int statusCode = response.getStatusLine().getStatusCode();
- String reason = response.getStatusLine().toString();
- if (statusCode == 200 && response.getEntity() != null) {
- LOG.info("Debug point response {}",
EntityUtils.toString(response.getEntity()));
- } else {
- LOG.info("Debug point failed, statusCode: {}, reason: {}",
statusCode, reason);
- }
- }
- }
- }
-
- public void faultInjectionClear() throws IOException {
- String apiUrl =
- String.format(
- "http://%s/api/debug_point/clear",
dorisContainerService.getBenodes());
- HttpPost httpPost = new HttpPost(apiUrl);
- httpPost.addHeader(
- HttpHeaders.AUTHORIZATION,
- auth(dorisContainerService.getUsername(),
dorisContainerService.getPassword()));
- try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
- try (CloseableHttpResponse response =
httpClient.execute(httpPost)) {
- int statusCode = response.getStatusLine().getStatusCode();
- String reason = response.getStatusLine().toString();
- if (statusCode == 200 && response.getEntity() != null) {
- LOG.info("Debug point response {}",
EntityUtils.toString(response.getEntity()));
- } else {
- LOG.info("Debug point failed, statusCode: {}, reason: {}",
statusCode, reason);
- }
- }
- }
- }
-
- private String auth(String user, String password) {
- final String authInfo = user + ":" + password;
- byte[] encoded =
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
- return "Basic " + new String(encoded);
- }
-
private void initializeTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
@@ -256,10 +207,4 @@ public class DorisSinkFailoverITCase extends
AbstractITCaseService {
+ ")\n",
DATABASE, table));
}
-
- enum FaultType {
- RESTART_FAILURE,
- STREAM_LOAD_FAILURE,
- CHECKPOINT_FAILURE
- }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
new file mode 100644
index 00000000..d5dd6927
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer;
+import org.apache.doris.flink.utils.MockMultiTableSource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+
+/** DorisSink abnormal case of multi-table writing */
+@RunWith(Parameterized.class)
+public class DorisSinkMultiTblFailoverITCase extends AbstractITCaseService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DorisSinkMultiTblFailoverITCase.class);
+ static final String DATABASE = "test_multi_failover_sink";
+ static final String TABLE_MULTI_CSV = "tbl_multi_csv";
+ private final boolean batchMode;
+
+ public DorisSinkMultiTblFailoverITCase(boolean batchMode) {
+ this.batchMode = batchMode;
+ }
+
+ @Parameterized.Parameters(name = "batchMode: {0}")
+ public static Object[] parameters() {
+ return new Object[][] {new Object[] {false}, new Object[] {true}};
+ }
+
+ /**
+ * Four exceptions are simulated in one job 1. Add a table that does not
exist 2. flink
+ * checkpoint failed 3. doris cluster restart 4. stream load fail
+ */
+ @Test
+ public void testDorisClusterFailoverSink() throws Exception {
+ int totalTblNum = 3;
+ for (int i = 1; i <= totalTblNum; i++) {
+ String tableName = TABLE_MULTI_CSV + i;
+ initializeTable(tableName);
+ }
+ int newTableIndex = totalTblNum + 1;
+ dropTable(TABLE_MULTI_CSV + newTableIndex);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ int checkpointIntervalMs = 5000;
+ env.enableCheckpointing(checkpointIntervalMs);
+
+ Properties properties = new Properties();
+ properties.setProperty("column_separator", ",");
+ properties.setProperty("format", "csv");
+ DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
+ DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+ executionBuilder
+ .setLabelPrefix(UUID.randomUUID().toString())
+ .enable2PC()
+ .setBatchMode(batchMode)
+ .setFlushQueueSize(4)
+ .setBufferSize(1024)
+ .setCheckInterval(1000)
+ .setStreamLoadProp(properties);
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder
+ .setFenodes(getFenodes())
+ .setTableIdentifier("")
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
+
+ builder.setDorisReadOptions(DorisReadOptions.builder().build())
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(new RecordWithMetaSerializer())
+ .setDorisOptions(dorisBuilder.build());
+
+ int triggerCkptError = 7;
+ int totalRecords = 20;
+ int addTblCheckpointId = 4;
+ DataStreamSource<RecordWithMeta> mockSource =
+ env.addSource(
+ new MockMultiTableSource(
+ totalRecords,
+ triggerCkptError,
+ DATABASE,
+ TABLE_MULTI_CSV,
+ totalTblNum,
+ addTblCheckpointId));
+
+ mockSource.sinkTo(builder.build());
+ JobClient jobClient = env.executeAsync();
+ CompletableFuture<JobStatus> jobStatus = jobClient.getJobStatus();
+ LOG.info("Job status: {}", jobStatus);
+
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(RUNNING),
+ Deadline.fromNow(Duration.ofSeconds(120)));
+
+ // wait checkpoint failure
+ List<JobStatus> errorStatus =
+ Arrays.asList(
+ JobStatus.FAILING,
+ JobStatus.CANCELLING,
+ JobStatus.CANCELED,
+ JobStatus.FAILED,
+ JobStatus.RESTARTING);
+
+ waitForJobStatus(jobClient, errorStatus,
Deadline.fromNow(Duration.ofSeconds(300)));
+
+ Random random = new Random();
+ LOG.info("start to create add table");
+ // random sleep to create table
+ Thread.sleep((checkpointIntervalMs / 1000 * random.nextInt(3)) * 1000);
+ // create new add table
+ initializeTable(TABLE_MULTI_CSV + newTableIndex);
+
+ LOG.info("wait job restart success");
+ // wait table restart success
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(RUNNING),
+ Deadline.fromNow(Duration.ofSeconds(300)));
+
+ LOG.info("wait job running finished");
+
+ List<String> result = new ArrayList<>();
+ boolean restart = false;
+ boolean faultInjection = false;
+ while (true) {
+ try {
+ // restart may be make query failed
+ String query =
+ String.format(
+ "select * from %s",
+ DATABASE + "." + TABLE_MULTI_CSV +
newTableIndex);
+ result =
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(), LOG, query, 2);
+ } catch (Exception ex) {
+ LOG.error("Failed to query result, cause " + ex.getMessage());
+ continue;
+ }
+
+ if (getFlinkJobStatus(jobClient).equals(JobStatus.FINISHED)) {
+ // Batch mode can only achieve at least once
+ break;
+ }
+
+ int randomSleepSec = random.nextInt(10);
+ if (result.size() > 2 && !faultInjection) {
+ faultInjectionOpen();
+ randomSleepSec = randomSleepSec + checkpointIntervalMs / 1000;
+ LOG.info("Injecting fault, sleep {}s before recover",
randomSleepSec);
+ Thread.sleep(randomSleepSec * 1000);
+ faultInjectionClear();
+ LOG.info("Injecting fault recover");
+ faultInjection = true;
+ }
+
+ if (result.size() > 6 && !restart) {
+ LOG.info(
+ "Restarting doris cluster, sleep {}s before next
restart",
+ randomSleepSec + 30);
+ dorisContainerService.restartContainer();
+ Thread.sleep((randomSleepSec + 30) * 1000);
+ restart = true;
+ }
+ Thread.sleep(500);
+ }
+
+ // concat expect value
+ List<String> expected = new ArrayList<>();
+ for (int tb = 1; tb <= newTableIndex; tb++) {
+ for (int i = 1; i <= totalRecords; i++) {
+ if (tb == newTableIndex && i <= addTblCheckpointId) {
+ continue;
+ }
+ for (int j = 0; j < DEFAULT_PARALLELISM; j++) {
+ expected.add(TABLE_MULTI_CSV + tb + "," + i + "," + j);
+ }
+ }
+ }
+
+ String queryRes =
+ String.format(
+ "select * from (select \"%s\" as tbl,id,task_id from
%s.%s "
+ + "union all select \"%s\" as tbl,id,task_id
from %s.%s "
+ + "union all select \"%s\" as tbl,id,task_id
from %s.%s "
+ + "union all select \"%s\" as tbl,id,task_id
from %s.%s ) a"
+ + " order by tbl,id,task_id",
+ TABLE_MULTI_CSV + 1,
+ DATABASE,
+ TABLE_MULTI_CSV + 1,
+ TABLE_MULTI_CSV + 2,
+ DATABASE,
+ TABLE_MULTI_CSV + 2,
+ TABLE_MULTI_CSV + 3,
+ DATABASE,
+ TABLE_MULTI_CSV + 3,
+ TABLE_MULTI_CSV + 4,
+ DATABASE,
+ TABLE_MULTI_CSV + 4);
+
+ if (!batchMode) {
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, expected, queryRes, 3,
false);
+ } else {
+ List<String> actualResult =
+ ContainerUtils.getResult(getDorisQueryConnection(), LOG,
expected, queryRes, 3);
+ LOG.info("actual size: {}, expected size: {}",
actualResult.size(), expected.size());
+ Assert.assertTrue(
+ actualResult.size() >= expected.size() &&
actualResult.containsAll(expected));
+ }
+ }
+
+ private void initializeTable(String table) {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`id` int,\n"
+ + "`task_id` int\n"
+ + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE, table));
+ }
+
+ private void dropTable(String table) {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockMultiTableSource.java
similarity index 64%
copy from
flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
copy to
flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockMultiTableSource.java
index fbbc0863..287275ea 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockMultiTableSource.java
@@ -27,40 +27,59 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.SerializableObject;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Iterator;
-public class MockSource extends RichParallelSourceFunction<String>
+/*
+ * MockMultiTableSource is a mock source for testing multi-table source
+ * */
+public class MockMultiTableSource extends
RichParallelSourceFunction<RecordWithMeta>
implements CheckpointedFunction, CheckpointListener {
- private static final Logger LOG =
LoggerFactory.getLogger(MockSource.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(MockMultiTableSource.class);
private final Object blocker = new SerializableObject();
private transient ListState<Long> state;
+ private transient ListState<Integer> tableNumState;
private Long id = 0L;
private int numEventsTotal;
private int failCheckpointId = -1;
private volatile boolean running = true;
private volatile long waitNextCheckpoint = 0L;
private volatile long lastCheckpointConfirmed = 0L;
+ private String database;
+ private String tableName;
+ private int tableNum;
+ private int addTableCheckpointId;
- public MockSource(int numEventsTotal) {
- this.numEventsTotal = numEventsTotal;
- }
-
- public MockSource(int numEventsTotal, int failCheckpointId) {
+ public MockMultiTableSource(
+ int numEventsTotal,
+ int failCheckpointId,
+ String database,
+ String tableName,
+ int tableNum,
+ int addTableCheckpointId) {
this.numEventsTotal = numEventsTotal;
this.failCheckpointId = failCheckpointId;
+ this.database = database;
+ this.tableName = tableName;
+ this.tableNum = tableNum;
+ this.addTableCheckpointId = addTableCheckpointId;
}
@Override
- public void run(SourceContext<String> ctx) throws Exception {
-
+ public void run(SourceContext<RecordWithMeta> ctx) throws Exception {
int taskId = getRuntimeContext().getIndexOfThisSubtask();
while (this.running && id < this.numEventsTotal) {
- String record = ++id + "," + taskId;
- ctx.collect(record);
+ id = id + 1;
+ for (int i = 1; i <= tableNum; i++) {
+ String record = id + "," + taskId;
+ RecordWithMeta output = new RecordWithMeta(database, tableName
+ i, record);
+ ctx.collect(output);
+ }
+
// Wait for the checkpoint to complete before sending the next
record
waitNextCheckpoint = lastCheckpointConfirmed + 1;
synchronized (this.blocker) {
@@ -79,11 +98,19 @@ public class MockSource extends
RichParallelSourceFunction<String>
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
state.update(Collections.singletonList(id));
- if (failCheckpointId > 0 && context.getCheckpointId() %
failCheckpointId == 0) {
+ if (context.getCheckpointId() == addTableCheckpointId) {
+ tableNum++;
+ }
+ tableNumState.update(Collections.singletonList(tableNum));
+ if (failCheckpointId > 0 && context.getCheckpointId() ==
failCheckpointId) {
throw new RuntimeException(
"Trigger fail for testing, checkpointId = " +
context.getCheckpointId());
}
- LOG.info("snapshot state to {} for checkpoint {}", id,
context.getCheckpointId());
+ LOG.info(
+ "snapshot state to id={}, tableNum={} for checkpoint {}",
+ id,
+ tableNum,
+ context.getCheckpointId());
}
@Override
@@ -92,13 +119,22 @@ public class MockSource extends
RichParallelSourceFunction<String>
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>("id",
TypeInformation.of(Long.class)));
+ tableNumState =
+ context.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>(
+ "tableNum",
TypeInformation.of(Integer.class)));
if (context.isRestored()) {
Iterator<Long> iterator = state.get().iterator();
while (iterator.hasNext()) {
id += iterator.next();
}
+ Iterator<Integer> tableNumIterator =
tableNumState.get().iterator();
+ while (tableNumIterator.hasNext()) {
+ tableNum = tableNumIterator.next();
+ }
}
- LOG.info("restore state from {}", id);
+ LOG.info("restore state from id {}, tableNum {}", id, tableNum);
}
@Override
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
index fbbc0863..b7e83196 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
@@ -33,6 +33,9 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Iterator;
+/*
+ * MockSource is a mock source for testing
+ * */
public class MockSource extends RichParallelSourceFunction<String>
implements CheckpointedFunction, CheckpointListener {
private static final Logger LOG =
LoggerFactory.getLogger(MockSource.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]