This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new cf877c1 disable checker thread default (#366)
cf877c1 is described below
commit cf877c167baa9961bf0c9ab5a31918f5d455c900
Author: wudi <[email protected]>
AuthorDate: Mon Apr 15 16:18:26 2024 +0800
disable checker thread default (#366)
---
.../doris/flink/cfg/DorisExecutionOptions.java | 3 ++-
.../doris/flink/sink/writer/DorisStreamLoad.java | 2 +-
.../doris/flink/sink/writer/DorisWriter.java | 31 +++++++++++++---------
.../doris/flink/table/DorisConfigOptions.java | 5 ++--
4 files changed, 25 insertions(+), 16 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index b1b49d1..c5a9f66 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -32,7 +32,8 @@ import static
org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE
public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;
- public static final int DEFAULT_CHECK_INTERVAL = 10000;
+ // 0 means disable checker thread
+ public static final int DEFAULT_CHECK_INTERVAL = 0;
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
private static final int DEFAULT_BUFFER_COUNT = 3;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index cbfa8ba..2680698 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -69,7 +69,7 @@ public class DorisStreamLoad implements Serializable {
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN =
"http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN =
"http://%s/api/%s/_stream_load_2pc";
- private static final String JOB_EXIST_FINISHED = "FINISHED";
+ public static final String JOB_EXIST_FINISHED = "FINISHED";
private String loadUrlStr;
private String hostPort;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index db6094e..08edfc7 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+import static
org.apache.doris.flink.sink.writer.DorisStreamLoad.JOB_EXIST_FINISHED;
/**
* Doris Writer will load data to doris.
@@ -131,10 +132,15 @@ public class DorisWriter<IN>
}
// get main work thread.
executorThread = Thread.currentThread();
- // when uploading data in streaming mode, we need to regularly detect
whether there are
- // exceptions.
- scheduledExecutorService.scheduleWithFixedDelay(
- this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
+ // todo: When writing to multiple tables,
+ // the checkdone thread may cause problems.
+ if (!multiTableLoad && intervalTime > 1000) {
+ // when uploading data in streaming mode, we need to regularly
detect whether there are
+ // exceptions.
+ LOG.info("start stream load checkdone thread.");
+ scheduledExecutorService.scheduleWithFixedDelay(
+ this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
+ }
}
private void abortLingeringTransactions(Collection<DorisWriterState>
recoveredStates)
@@ -248,8 +254,13 @@ public class DorisWriter<IN>
}
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
if (executionOptions.enabled2PC()
- &&
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
- LOG.info("try to abort {} cause Label Already Exists",
respContent.getLabel());
+ &&
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())
+ &&
!JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
+ LOG.info(
+ "try to abort {} cause status {}, exist job status
{} ",
+ respContent.getLabel(),
+ respContent.getStatus(),
+ respContent.getExistingJobStatus());
dorisStreamLoad.abortLabelExistTransaction(respContent);
throw new LabelAlreadyExistsException("Exist label abort
finished, retry");
} else {
@@ -338,12 +349,8 @@ public class DorisWriter<IN>
/** Check the streamload http request regularly. */
private void checkDone() {
- // todo: When writing to multiple tables,
- // the checkdone thread may cause problems. Disable it first.
- if (!multiTableLoad) {
- for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
dorisStreamLoadMap.entrySet()) {
- checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
- }
+ for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
dorisStreamLoadMap.entrySet()) {
+ checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 6d74bf8..63e550a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -195,8 +195,9 @@ public class DorisConfigOptions {
public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
ConfigOptions.key("sink.check-interval")
.durationType()
- .defaultValue(Duration.ofMillis(10000))
- .withDescription("check exception with the interval while
loading");
+ .defaultValue(Duration.ofMillis(0))
+ .withDescription(
+ "check exception with the interval while loading,
The default is 0, disabling the checker thread");
public static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]