This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 2bcfad1d [Improve](Stream) Optimize the situation where Flink may get
stuck after the streamload thread exits abnormally (#578)
2bcfad1d is described below
commit 2bcfad1dd00552992f6ab397caa95832798b3219
Author: wudi <[email protected]>
AuthorDate: Tue Mar 18 16:34:00 2025 +0800
[Improve](Stream) Optimize the situation where Flink may get stuck after
the streamload thread exits abnormally (#578)
---
.../doris/flink/cfg/DorisExecutionOptions.java | 2 +-
.../exception/LabelAlreadyExistsException.java | 2 +-
.../doris/flink/sink/writer/DorisStreamLoad.java | 118 +++++++++++++++-----
.../doris/flink/sink/writer/DorisWriter.java | 116 +++++---------------
.../doris/flink/sink/writer/LoadConstants.java | 9 ++
.../doris/flink/sink/writer/RecordBuffer.java | 34 +++---
.../doris/flink/sink/writer/RecordStream.java | 7 +-
.../doris/flink/table/DorisConfigOptions.java | 4 +-
.../sink/DorisSinkMultiTblFailoverITCase.java | 121 ++++++++++++++++++++-
.../doris/flink/sink/writer/TestDorisWriter.java | 54 ++++-----
10 files changed, 299 insertions(+), 168 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 371069f5..831a317e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -34,7 +34,7 @@ public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;
// 0 means disable checker thread
- public static final int DEFAULT_CHECK_INTERVAL = 10000;
+ public static final int DEFAULT_CHECK_INTERVAL = 0;
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
private static final int DEFAULT_BUFFER_COUNT = 3;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
index ea86c60a..9a523b16 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
@@ -17,7 +17,7 @@
package org.apache.doris.flink.exception;
-public class LabelAlreadyExistsException extends RuntimeException {
+public class LabelAlreadyExistsException extends DorisRuntimeException {
public LabelAlreadyExistsException() {
super();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index c6e39326..18a97959 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -28,10 +28,12 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.LabelAlreadyExistsException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
+import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -47,6 +49,7 @@ import java.net.NoRouteToHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -61,6 +64,7 @@ import static
org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
import static
org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
+import static
org.apache.doris.flink.sink.writer.LoadConstants.DORIS_SUCCESS_STATUS;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
@@ -88,7 +92,7 @@ public class DorisStreamLoad implements Serializable {
private final boolean enableDelete;
private final Properties streamLoadProp;
private final RecordStream recordStream;
- private volatile Future<CloseableHttpResponse> pendingLoadFuture;
+ private volatile Future<RespContent> pendingLoadFuture;
private volatile Exception httpException = null;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
@@ -175,7 +179,7 @@ public class DorisStreamLoad implements Serializable {
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
}
- public Future<CloseableHttpResponse> getPendingLoadFuture() {
+ public Future<RespContent> getPendingLoadFuture() {
return pendingLoadFuture;
}
@@ -254,19 +258,30 @@ public class DorisStreamLoad implements Serializable {
* @param record
* @throws IOException
*/
- public void writeRecord(byte[] record) throws IOException {
+ public void writeRecord(byte[] record) throws InterruptedException {
checkLoadException();
- if (loadBatchFirstRecord) {
- loadBatchFirstRecord = false;
- } else if (lineDelimiter != null) {
- recordStream.write(lineDelimiter);
+ try {
+ if (loadBatchFirstRecord) {
+ loadBatchFirstRecord = false;
+ } else if (lineDelimiter != null) {
+ recordStream.write(lineDelimiter);
+ }
+ recordStream.write(record);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (httpException != null) {
+ throw new DorisRuntimeException(httpException.getMessage(),
httpException);
+ } else {
+ LOG.info("write record interrupted, cause " + e.getClass());
+ throw e;
+ }
}
- recordStream.write(record);
}
private void checkLoadException() {
if (httpException != null) {
- throw new RuntimeException("Stream load http request error, ",
httpException);
+ throw new RuntimeException(
+ "Stream load http request error, " +
httpException.getMessage(), httpException);
}
}
@@ -292,26 +307,32 @@ public class DorisStreamLoad implements Serializable {
throw new StreamLoadException("stream load error: " +
response.getStatusLine().toString());
}
- public RespContent stopLoad() throws IOException {
- recordStream.endInput();
- if (enableGroupCommit) {
- LOG.info("table {} stream load stopped with group commit on host
{}", table, hostPort);
- } else {
- LOG.info(
- "table {} stream load stopped for {} on host {}",
- table,
- currentLabel,
- hostPort);
- }
-
- Preconditions.checkState(pendingLoadFuture != null);
+ public RespContent stopLoad() throws InterruptedException {
try {
- return handlePreCommitResponse(pendingLoadFuture.get());
- } catch (NoRouteToHostException nex) {
- LOG.error("Failed to connect, cause ", nex);
- throw new DorisRuntimeException(
- "No Route to Host to " + hostPort + ", exception: " + nex);
- } catch (Exception e) {
+ recordStream.endInput();
+ if (enableGroupCommit) {
+ LOG.info(
+ "table {} stream load stopped with group commit on
host {}",
+ table,
+ hostPort);
+ } else {
+ LOG.info(
+ "table {} stream load stopped for {} on host {}",
+ table,
+ currentLabel,
+ hostPort);
+ }
+
+ Preconditions.checkState(pendingLoadFuture != null);
+ return pendingLoadFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (httpException != null) {
+ throw new DorisRuntimeException(httpException.getMessage(),
httpException);
+ } else {
+ throw e;
+ }
+ } catch (ExecutionException e) {
throw new DorisRuntimeException(e);
}
}
@@ -365,7 +386,46 @@ public class DorisStreamLoad implements Serializable {
() -> {
LOG.info(executeMessage);
try {
- return
httpClient.execute(putBuilder.build());
+ CloseableHttpResponse execute =
+
httpClient.execute(putBuilder.build());
+ RespContent respContent =
handlePreCommitResponse(execute);
+
+ if
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+ if (enable2PC
+ &&
LoadStatus.LABEL_ALREADY_EXIST.equals(
+
respContent.getStatus())
+ && !JOB_EXIST_FINISHED.equals(
+
respContent.getExistingJobStatus())) {
+ LOG.info(
+ "try to abort {} cause
status {}, exist job status {} ",
+ respContent.getLabel(),
+ respContent.getStatus(),
+
respContent.getExistingJobStatus());
+
abortLabelExistTransaction(respContent);
+ throw new
LabelAlreadyExistsException(
+ "Exist label abort
finished, retry");
+ } else {
+ String errMsg =
+ String.format(
+ "table %s.%s
stream load error: %s, see more in %s",
+ getDb(),
+ getTable(),
+
respContent.getMessage(),
+
respContent.getErrorURL());
+ LOG.error("Failed to load, {}",
errMsg);
+ throw new
DorisRuntimeException(errMsg);
+ }
+ }
+ return respContent;
+ } catch (NoRouteToHostException nex) {
+ LOG.error("Failed to connect, cause ",
nex);
+ httpException = nex;
+ mainThread.interrupt();
+ throw new DorisRuntimeException(
+ "No Route to Host to "
+ + hostPort
+ + ", exception: "
+ + nex);
} catch (Exception e) {
LOG.error("Failed to execute load, cause
", e);
httpException = e;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 466b995f..64fbc95f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -27,7 +27,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
-import org.apache.doris.flink.exception.LabelAlreadyExistsException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.BackendUtil;
@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -49,12 +47,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
-import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
-import static
org.apache.doris.flink.sink.writer.DorisStreamLoad.JOB_EXIST_FINISHED;
/**
* Doris Writer will load data to doris.
@@ -64,8 +56,6 @@ import static
org.apache.doris.flink.sink.writer.DorisStreamLoad.JOB_EXIST_FINIS
public class DorisWriter<IN>
implements DorisAbstractWriter<IN, DorisWriterState, DorisCommittable>
{
private static final Logger LOG =
LoggerFactory.getLogger(DorisWriter.class);
- private static final List<String> DORIS_SUCCESS_STATUS =
- new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private final long lastCheckpointId;
private long curCheckpointId;
private Map<String, DorisStreamLoad> dorisStreamLoadMap = new
ConcurrentHashMap<>();
@@ -86,7 +76,6 @@ public class DorisWriter<IN>
private SinkWriterMetricGroup sinkMetricGroup;
private Map<String, DorisWriteMetrics> sinkMetricsMap = new
ConcurrentHashMap<>();
private volatile boolean multiTableLoad = false;
- private final ReentrantLock checkLock = new ReentrantLock();
public DorisWriter(
Sink.InitContext initContext,
@@ -139,13 +128,6 @@ public class DorisWriter<IN>
}
// get main work thread.
executorThread = Thread.currentThread();
- if (intervalTime >= 1000) {
- // when uploading data in streaming mode, we need to regularly
detect whether there are
- // exceptions.
- LOG.info("start stream load checkdone thread with interval {} ms",
intervalTime);
- scheduledExecutorService.scheduleWithFixedDelay(
- this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
- }
}
@VisibleForTesting
@@ -242,59 +224,32 @@ public class DorisWriter<IN>
}
// disable exception checker before stop load.
globalLoading = false;
- checkLock.lockInterruptibly();
- try {
- // submit stream load http request
- List<DorisCommittable> committableList = new ArrayList<>();
- for (Map.Entry<String, DorisStreamLoad> streamLoader :
dorisStreamLoadMap.entrySet()) {
- String tableIdentifier = streamLoader.getKey();
- if (!loadingMap.getOrDefault(tableIdentifier, false)) {
- LOG.debug("skip table {}, no data need to load.",
tableIdentifier);
- continue;
- }
- DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
- RespContent respContent = dorisStreamLoad.stopLoad();
- // refresh metrics
- if (sinkMetricsMap.containsKey(tableIdentifier)) {
- DorisWriteMetrics dorisWriteMetrics =
sinkMetricsMap.get(tableIdentifier);
- dorisWriteMetrics.flush(respContent);
- }
- if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
- if (executionOptions.enabled2PC()
- &&
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())
- &&
!JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
- LOG.info(
- "try to abort {} cause status {}, exist job
status {} ",
- respContent.getLabel(),
- respContent.getStatus(),
- respContent.getExistingJobStatus());
-
dorisStreamLoad.abortLabelExistTransaction(respContent);
- throw new LabelAlreadyExistsException("Exist label
abort finished, retry");
- } else {
- String errMsg =
- String.format(
- "table %s stream load error: %s, see
more in %s",
- tableIdentifier,
- respContent.getMessage(),
- respContent.getErrorURL());
- LOG.error("Failed to load, {}", errMsg);
- throw new DorisRuntimeException(errMsg);
- }
- }
- if (executionOptions.enabled2PC()) {
- long txnId = respContent.getTxnId();
- committableList.add(
- new DorisCommittable(
- dorisStreamLoad.getHostPort(),
dorisStreamLoad.getDb(), txnId));
- }
+ // submit stream load http request
+ List<DorisCommittable> committableList = new ArrayList<>();
+ for (Map.Entry<String, DorisStreamLoad> streamLoader :
dorisStreamLoadMap.entrySet()) {
+ String tableIdentifier = streamLoader.getKey();
+ if (!loadingMap.getOrDefault(tableIdentifier, false)) {
+ LOG.debug("skip table {}, no data need to load.",
tableIdentifier);
+ continue;
+ }
+ DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
+ RespContent respContent = dorisStreamLoad.stopLoad();
+ // refresh metrics
+ if (sinkMetricsMap.containsKey(tableIdentifier)) {
+ DorisWriteMetrics dorisWriteMetrics =
sinkMetricsMap.get(tableIdentifier);
+ dorisWriteMetrics.flush(respContent);
+ }
+ if (executionOptions.enabled2PC()) {
+ long txnId = respContent.getTxnId();
+ committableList.add(
+ new DorisCommittable(
+ dorisStreamLoad.getHostPort(),
dorisStreamLoad.getDb(), txnId));
}
-
- // clean loadingMap
- loadingMap.clear();
- return committableList;
- } finally {
- checkLock.unlock();
}
+
+ // clean loadingMap
+ loadingMap.clear();
+ return committableList;
}
private void abortPossibleSuccessfulTransaction() {
@@ -358,25 +313,15 @@ public class DorisWriter<IN>
new
HttpUtil(dorisReadOptions).getHttpClient()));
}
- /** Check the streamload http request regularly. */
+ /** Http throws an exception actively, there is no need to check
regularly. */
+ @Deprecated
private void checkDone() {
if (!globalLoading) {
return;
}
LOG.debug("start timer checker, interval {} ms", intervalTime);
- if (checkLock.tryLock()) {
- try {
- // double check
- if (!globalLoading) {
- return;
- }
- for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
- dorisStreamLoadMap.entrySet()) {
- checkAllDone(streamLoadMap.getKey(),
streamLoadMap.getValue());
- }
- } finally {
- checkLock.unlock();
- }
+ for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
dorisStreamLoadMap.entrySet()) {
+ checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
}
}
@@ -401,6 +346,7 @@ public class DorisWriter<IN>
// use send cached data to new txn, then notify to restart the
stream
if (executionOptions.isUseCache()) {
try {
+
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
if (executionOptions.enabled2PC()) {
dorisStreamLoad.abortPreCommit(labelPrefix,
curCheckpointId);
@@ -419,9 +365,7 @@ public class DorisWriter<IN>
} else {
String errorMsg;
try {
- RespContent content =
- dorisStreamLoad.handlePreCommitResponse(
-
dorisStreamLoad.getPendingLoadFuture().get());
+ RespContent content =
dorisStreamLoad.getPendingLoadFuture().get();
if (executionOptions.enabled2PC()
&&
LoadStatus.LABEL_ALREADY_EXIST.equals(content.getStatus())) {
LOG.info(
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index 1e026977..54deb4a0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -17,6 +17,13 @@
package org.apache.doris.flink.sink.writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+
/** Constants for load. */
public class LoadConstants {
public static final String COLUMNS_KEY = "columns";
@@ -35,4 +42,6 @@ public class LoadConstants {
public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
public static final String COMPRESS_TYPE = "compress_type";
public static final String COMPRESS_TYPE_GZ = "gz";
+ public static final List<String> DORIS_SUCCESS_STATUS =
+ new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
index c84e1039..aeecaed9 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
@@ -64,25 +64,21 @@ public class RecordBuffer {
}
}
- public void stopBufferData() throws IOException {
- try {
- // add Empty buffer as finish flag.
- boolean isEmpty = false;
- if (currentWriteBuffer != null) {
- currentWriteBuffer.flip();
- // check if the current write buffer is empty.
- isEmpty = currentWriteBuffer.limit() == 0;
- readQueue.put(currentWriteBuffer);
- currentWriteBuffer = null;
- }
- if (!isEmpty) {
- ByteBuffer byteBuffer = writeQueue.take();
- byteBuffer.flip();
- Preconditions.checkState(byteBuffer.limit() == 0);
- readQueue.put(byteBuffer);
- }
- } catch (Exception e) {
- throw new IOException(e);
+ public void stopBufferData() throws InterruptedException {
+ // add Empty buffer as finish flag.
+ boolean isEmpty = false;
+ if (currentWriteBuffer != null) {
+ currentWriteBuffer.flip();
+ // check if the current write buffer is empty.
+ isEmpty = currentWriteBuffer.limit() == 0;
+ readQueue.put(currentWriteBuffer);
+ currentWriteBuffer = null;
+ }
+ if (!isEmpty) {
+ ByteBuffer byteBuffer = writeQueue.take();
+ byteBuffer.flip();
+ Preconditions.checkState(byteBuffer.limit() == 0);
+ readQueue.put(byteBuffer);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
index 693967ac..2fe51734 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
@@ -45,7 +45,7 @@ public class RecordStream extends InputStream {
recordBuffer.startBufferData();
}
- public void endInput() throws IOException {
+ public void endInput() throws InterruptedException {
recordBuffer.stopBufferData();
}
@@ -54,15 +54,16 @@ public class RecordStream extends InputStream {
try {
return recordBuffer.read(buff);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
- public void write(byte[] buff) throws IOException {
+ public void write(byte[] buff) throws InterruptedException {
try {
recordBuffer.write(buff);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw e;
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index b99710e3..ffd3ec92 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -216,9 +216,9 @@ public class DorisConfigOptions {
public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
ConfigOptions.key("sink.check-interval")
.durationType()
- .defaultValue(Duration.ofMillis(10000))
+ .defaultValue(Duration.ofMillis(0))
.withDescription(
- "check exception with the interval while loading,
The default is 1s, 0 means disabling the checker thread");
+ "check exception with the interval while loading,
0 means disabling the checker thread");
public static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
index d5dd6927..f4146990 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -48,6 +49,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
import static org.apache.flink.api.common.JobStatus.RUNNING;
/** DorisSink abnormal case of multi-table writing */
@@ -57,6 +59,7 @@ public class DorisSinkMultiTblFailoverITCase extends
AbstractITCaseService {
LoggerFactory.getLogger(DorisSinkMultiTblFailoverITCase.class);
static final String DATABASE = "test_multi_failover_sink";
static final String TABLE_MULTI_CSV = "tbl_multi_csv";
+ static final String TABLE_MULTI_CSV_NO_EXIST_TBL =
"tbl_multi_csv_no_exist";
private final boolean batchMode;
public DorisSinkMultiTblFailoverITCase(boolean batchMode) {
@@ -68,12 +71,121 @@ public class DorisSinkMultiTblFailoverITCase extends
AbstractITCaseService {
return new Object[][] {new Object[] {false}, new Object[] {true}};
}
+ /**
+ * In an extreme case, during a checkpoint, a piece of data written is
bufferCount*bufferSize
+ */
+ @Test
+ public void testTableNotExistCornerCase() throws Exception {
+ LOG.info("Start to testTableNotExistCornerCase");
+ dropDatabase();
+ dropTable(TABLE_MULTI_CSV_NO_EXIST_TBL);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getCheckpointConfig().setCheckpointTimeout(300 * 1000);
+ env.setParallelism(1);
+ int checkpointIntervalMs = 10000;
+ env.enableCheckpointing(checkpointIntervalMs);
+
+ Properties properties = new Properties();
+ properties.setProperty("column_separator", ",");
+ properties.setProperty("format", "csv");
+ DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
+ DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+ executionBuilder
+ .setLabelPrefix(UUID.randomUUID().toString())
+ .enable2PC()
+ .setBatchMode(batchMode)
+ .setFlushQueueSize(1)
+ .setBufferSize(1)
+ .setBufferCount(3)
+ .setCheckInterval(0)
+ .setStreamLoadProp(properties);
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder
+ .setFenodes(getFenodes())
+ .setTableIdentifier("")
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
+
+ builder.setDorisReadOptions(DorisReadOptions.builder().build())
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(new RecordWithMetaSerializer())
+ .setDorisOptions(dorisBuilder.build());
+
+ DataStreamSource<RecordWithMeta> mockSource =
+ env.addSource(
+ new SourceFunction<RecordWithMeta>() {
+ @Override
+ public void run(SourceContext<RecordWithMeta> ctx)
throws Exception {
+ RecordWithMeta record3 =
+ new RecordWithMeta(
+ DATABASE,
TABLE_MULTI_CSV_NO_EXIST_TBL, "1,3");
+ ctx.collect(record3);
+ }
+
+ @Override
+ public void cancel() {}
+ });
+
+ mockSource.sinkTo(builder.build());
+ JobClient jobClient = env.executeAsync();
+ CompletableFuture<JobStatus> jobStatus = jobClient.getJobStatus();
+ LOG.info("Job status: {}", jobStatus);
+
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(RUNNING),
+ Deadline.fromNow(Duration.ofSeconds(60)));
+ // wait checkpoint failure
+ List<JobStatus> errorStatus =
+ Arrays.asList(
+ JobStatus.FAILING,
+ JobStatus.CANCELLING,
+ JobStatus.CANCELED,
+ JobStatus.FAILED,
+ JobStatus.RESTARTING);
+
+ waitForJobStatus(jobClient, errorStatus,
Deadline.fromNow(Duration.ofSeconds(30)));
+
+ LOG.info("start to create add table");
+ initializeTable(TABLE_MULTI_CSV_NO_EXIST_TBL);
+
+ LOG.info("wait job restart success");
+ // wait table restart success
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(RUNNING),
+ Deadline.fromNow(Duration.ofSeconds(60)));
+
+ LOG.info("wait job running finished");
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(FINISHED),
+ Deadline.fromNow(Duration.ofSeconds(60)));
+
+ String queryRes =
+ String.format(
+ "select id,task_id from %s.%s ", DATABASE,
TABLE_MULTI_CSV_NO_EXIST_TBL);
+ List<String> expected = Arrays.asList("1,3");
+
+ if (!batchMode) {
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, expected, queryRes, 2,
false);
+ } else {
+ List<String> actualResult =
+ ContainerUtils.getResult(getDorisQueryConnection(), LOG,
expected, queryRes, 2);
+ LOG.info("actual size: {}, expected size: {}",
actualResult.size(), expected.size());
+ Assert.assertTrue(
+ actualResult.size() >= expected.size() &&
actualResult.containsAll(expected));
+ }
+ }
+
/**
* Four exceptions are simulated in one job 1. Add a table that does not
exist 2. flink
* checkpoint failed 3. doris cluster restart 4. stream load fail
*/
@Test
- public void testDorisClusterFailoverSink() throws Exception {
+ public void testMultiTblFailoverSink() throws Exception {
+ LOG.info("Start to testMultiTblFailoverSink");
int totalTblNum = 3;
for (int i = 1; i <= totalTblNum; i++) {
String tableName = TABLE_MULTI_CSV + i;
@@ -274,4 +386,11 @@ public class DorisSinkMultiTblFailoverITCase extends
AbstractITCaseService {
LOG,
String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
}
+
+ private void dropDatabase() {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 3e1ab2c2..c1f59c74 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -26,20 +26,18 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
-import org.apache.doris.flink.exception.LabelAlreadyExistsException;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpTestUtil;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.mockito.MockedStatic;
import java.io.IOException;
@@ -51,7 +49,7 @@ import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
@@ -61,9 +59,7 @@ public class TestDorisWriter {
DorisOptions dorisOptions;
DorisReadOptions readOptions;
DorisExecutionOptions executionOptions;
-
private MockedStatic<BackendUtil> backendUtilMockedStatic;
- @Rule public ExpectedException thrown = ExpectedException.none();
@Before
public void setUp() {
@@ -79,7 +75,7 @@ public class TestDorisWriter {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse preCommitResponse =
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE,
true);
- when(httpClient.execute(any())).thenReturn(preCommitResponse);
+
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse);
DorisWriter<String> dorisWriter = initWriter(httpClient);
dorisWriter.write("doris,1", null);
Collection<DorisCommittable> committableList =
dorisWriter.prepareCommit();
@@ -96,12 +92,14 @@ public class TestDorisWriter {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse preCommitResponse =
HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE,
true);
- when(httpClient.execute(any())).thenReturn(preCommitResponse);
+
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse);
DorisWriter<String> dorisWriter = initWriter(httpClient);
dorisWriter.write("doris,1", null);
- thrown.expect(LabelAlreadyExistsException.class);
- thrown.expectMessage("Exist label abort finished, retry");
- dorisWriter.prepareCommit();
+ try {
+ dorisWriter.prepareCommit();
+ } catch (DorisRuntimeException e) {
+ Assert.assertTrue(e.getMessage().contains("Exist label abort
finished, retry"));
+ }
}
@Test
@@ -110,12 +108,14 @@ public class TestDorisWriter {
CloseableHttpResponse preCommitResponse =
HttpTestUtil.getResponse(
HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_FINISH_RESPONSE, true);
- when(httpClient.execute(any())).thenReturn(preCommitResponse);
+
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse);
DorisWriter<String> dorisWriter = initWriter(httpClient);
dorisWriter.write("doris,1", null);
- thrown.expect(DorisRuntimeException.class);
- thrown.expectMessage(contains("stream load error"));
- dorisWriter.prepareCommit();
+ try {
+ dorisWriter.prepareCommit();
+ } catch (DorisRuntimeException e) {
+ Assert.assertTrue(e.getMessage().contains("stream load error"));
+ }
}
@Test
@@ -124,7 +124,7 @@ public class TestDorisWriter {
CloseableHttpResponse preCommitResponse =
HttpTestUtil.getResponse(
HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_FINISH_RESPONSE, true);
- when(httpClient.execute(any())).thenReturn(preCommitResponse);
+
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse);
Map<String, DorisStreamLoad> dorisStreamLoadMap = new
ConcurrentHashMap<>();
Map<String, DorisWriteMetrics> dorisWriteMetricsMap = new
ConcurrentHashMap<>();
Sink.InitContext initContext = mock(Sink.InitContext.class);
@@ -154,13 +154,14 @@ public class TestDorisWriter {
new LabelGenerator("", true),
httpClient);
dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(),
dorisStreamLoad);
- dorisStreamLoad.startLoad("", false);
dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
dorisWriter.setDorisMetricsMap(dorisWriteMetricsMap);
dorisWriter.write("doris,1", null);
- thrown.expect(DorisRuntimeException.class);
- thrown.expectMessage(contains("stream load error"));
- dorisWriter.prepareCommit();
+ try {
+ dorisWriter.prepareCommit();
+ } catch (DorisRuntimeException e) {
+ Assert.assertTrue(e.getMessage().contains("stream load error"));
+ }
}
@Test
@@ -171,9 +172,11 @@ public class TestDorisWriter {
when(httpClient.execute(any())).thenReturn(preCommitResponse);
DorisWriter<String> dorisWriter = initWriter(httpClient);
dorisWriter.write("doris,1", null);
- thrown.expect(DorisRuntimeException.class);
- thrown.expectMessage(contains("stream load error"));
- dorisWriter.prepareCommit();
+ try {
+ dorisWriter.prepareCommit();
+ } catch (DorisRuntimeException e) {
+ Assert.assertTrue(e.getMessage().contains("stream load error"));
+ }
}
private DorisWriter<String> initWriter(CloseableHttpClient httpClient)
throws IOException {
@@ -205,7 +208,6 @@ public class TestDorisWriter {
new LabelGenerator("", true),
httpClient);
dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(),
dorisStreamLoad);
- dorisStreamLoad.startLoad("", false);
dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
dorisWriter.setDorisMetricsMap(dorisWriteMetricsMap);
return dorisWriter;
@@ -219,13 +221,13 @@ public class TestDorisWriter {
when(httpClient.execute(any())).thenReturn(preCommitResponse);
DorisWriter<String> dorisWriter = initWriter(httpClient);
BackendUtil mock = mock(BackendUtil.class);
- when(mock.tryHttpConnection(any())).thenReturn(true);
+ when(mock.tryHttpConnection(anyString())).thenReturn(true);
dorisWriter.setBackendUtil(mock);
+ dorisWriter.write("doris,1", null);
List<DorisWriterState> writerStates = dorisWriter.snapshotState(1);
Assert.assertEquals(1, writerStates.size());
Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix());
- Assert.assertTrue(!dorisWriter.isLoading());
}
public DorisWriteMetrics getMockWriteMetrics(SinkWriterMetricGroup
sinkWriterMetricGroup) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]