This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 869417660e [Improve] Improve doris sink to random use be (#6132)
869417660e is described below
commit 869417660edc32cfe99891ba256495105a94bbd9
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jan 9 20:06:44 2024 +0800
[Improve] Improve doris sink to random use be (#6132)
---
.../doris/sink/writer/DorisSinkWriter.java | 19 +++++---------
.../doris/sink/writer/DorisStreamLoad.java | 29 +++++++++++++++-------
2 files changed, 26 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 0abdc6269c..40e0bc3a2f 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -35,9 +35,7 @@ import
org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import org.apache.seatunnel.connectors.doris.util.HttpUtil;
import
org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -76,7 +74,6 @@ public class DorisSinkWriter
private transient Thread executorThread;
private transient volatile Exception loadException = null;
private List<BackendV2.BackendRowV2> backends;
- private long pos;
public DorisSinkWriter(
SinkWriter.Context context,
@@ -156,7 +153,7 @@ public class DorisSinkWriter
@Override
public Optional<DorisCommitInfo> prepareCommit() throws IOException {
RespContent respContent = flush();
- if (!dorisConfig.getEnable2PC()) {
+ if (!dorisConfig.getEnable2PC() || respContent == null) {
return Optional.empty();
}
long txnId = respContent.getTxnId();
@@ -165,12 +162,12 @@ public class DorisSinkWriter
new DorisCommitInfo(dorisStreamLoad.getHostPort(),
dorisStreamLoad.getDb(), txnId));
}
- @NonNull private RespContent flush() throws IOException {
+ private RespContent flush() throws IOException {
// disable exception checker before stop load.
loading = false;
checkState(dorisStreamLoad != null);
RespContent respContent = dorisStreamLoad.stopLoad();
- if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+ if (respContent != null &&
!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg =
String.format(
"stream load error: %s, see more in %s",
@@ -253,14 +250,11 @@ public class DorisSinkWriter
}
}
- @VisibleForTesting
- public String getAvailableBackend() {
- long tmp = pos + backends.size();
- while (pos < tmp) {
- BackendV2.BackendRowV2 backend = backends.get((int) (pos %
backends.size()));
+ private String getAvailableBackend() {
+ Collections.shuffle(backends);
+ for (BackendV2.BackendRowV2 backend : backends) {
String res = backend.toBackendString();
if (tryHttpConnection(res)) {
- pos++;
return res;
}
}
@@ -279,7 +273,6 @@ public class DorisSinkWriter
return true;
} catch (Exception ex) {
log.warn("Failed to connect to backend:{}", backend, ex);
- pos++;
return false;
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index 00f21e9ae5..c2a8fc3bb5 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -78,7 +78,8 @@ public class DorisStreamLoad implements Serializable {
private Future<CloseableHttpResponse> pendingLoadFuture;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
- private boolean loadBatchFirstRecord;
+ private volatile boolean loadBatchFirstRecord;
+ private String label;
private long recordCount = 0;
public DorisStreamLoad(
@@ -191,6 +192,8 @@ public class DorisStreamLoad implements Serializable {
public void writeRecord(byte[] record) throws IOException {
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
+ recordStream.startInput();
+ startStreamLoad();
} else {
recordStream.write(lineDelimiter);
}
@@ -214,21 +217,29 @@ public class DorisStreamLoad implements Serializable {
}
public RespContent stopLoad() throws IOException {
- recordStream.endInput();
- log.info("stream load stopped.");
- checkState(pendingLoadFuture != null);
- try {
- return handlePreCommitResponse(pendingLoadFuture.get());
- } catch (Exception e) {
- throw new
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
+ if (pendingLoadFuture != null) {
+ log.info("stream load stopped.");
+ recordStream.endInput();
+ try {
+ return handlePreCommitResponse(pendingLoadFuture.get());
+ } catch (Exception e) {
+ throw new
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
+ } finally {
+ pendingLoadFuture = null;
+ }
+ } else {
+ return null;
}
}
public void startLoad(String label) throws IOException {
loadBatchFirstRecord = true;
recordCount = 0;
+ this.label = label;
+ }
+
+ private void startStreamLoad() {
HttpPutBuilder putBuilder = new HttpPutBuilder();
- recordStream.startInput();
log.info("stream load started for {}", label);
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);