This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 16b01d0e [improvement] fix batch streamload when label already exist
(#470)
16b01d0e is described below
commit 16b01d0e3cd85f10a7aeedd962b00d14e9bed80d
Author: wudi <[email protected]>
AuthorDate: Thu Aug 15 15:21:57 2024 +0800
[improvement] fix batch streamload when label already exist (#470)
---
.../apache/doris/flink/sink/HttpPutBuilder.java | 4 +++
.../flink/sink/batch/DorisBatchStreamLoad.java | 34 +++++++++++++---------
2 files changed, 25 insertions(+), 13 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
index 44f6c9fe..da1a7999 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
@@ -117,6 +117,10 @@ public class HttpPutBuilder {
return this;
}
+ public String getLabel() {
+ return header.get("label");
+ }
+
public HttpPut build() {
Preconditions.checkNotNull(url);
Preconditions.checkNotNull(httpEntity);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 3240dafe..42b83207 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -450,25 +451,22 @@ public class DorisBatchStreamLoad implements Serializable
{
if (enableGroupCommit) {
LOG.info("stream load started with group commit on host
{}", hostPort);
} else {
- LOG.info("stream load started for {} on host {}", label,
hostPort);
+ LOG.info(
+ "stream load started for {} on host {}",
+ putBuilder.getLabel(),
+ hostPort);
}
try (CloseableHttpClient httpClient =
httpClientBuilder.build()) {
try (CloseableHttpResponse response =
httpClient.execute(putBuilder.build())) {
int statusCode =
response.getStatusLine().getStatusCode();
+ String reason = response.getStatusLine().toString();
if (statusCode == 200 && response.getEntity() != null)
{
String loadResult =
EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
RespContent respContent =
OBJECT_MAPPER.readValue(loadResult,
RespContent.class);
- if
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
- String errMsg =
- String.format(
- "stream load error: %s, see
more in %s",
- respContent.getMessage(),
- respContent.getErrorURL());
- throw new DorisBatchLoadException(errMsg);
- } else {
+ if
(DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
long cacheByteBeforeFlush =
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
LOG.info(
@@ -482,16 +480,26 @@ public class DorisBatchStreamLoad implements Serializable
{
lock.unlock();
}
return;
+ } else if (LoadStatus.LABEL_ALREADY_EXIST.equals(
+ respContent.getStatus())) {
+ // todo: need to abort transaction when
JobStatus not finished
+ putBuilder.setLabel(label + "_" + retry);
+ reason = respContent.getMessage();
+ } else {
+ String errMsg =
+ String.format(
+ "stream load error: %s, see
more in %s",
+ respContent.getMessage(),
+ respContent.getErrorURL());
+ throw new DorisBatchLoadException(errMsg);
}
}
LOG.error(
"stream load failed with {}, reason {}, to
retry",
hostPort,
- response.getStatusLine().toString());
+ reason);
if (retry == executionOptions.getMaxRetries()) {
- resEx =
- new DorisRuntimeException(
- "stream load failed with: " +
response.getStatusLine());
+ resEx = new DorisRuntimeException("stream load
failed with: " + reason);
}
} catch (Exception ex) {
resEx = ex;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]