This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 007a9940e3 [Improve] Improve doris stream load client side error
message (#6688)
007a9940e3 is described below
commit 007a9940e37c1cd89a4a8444d4aee60caf438881
Author: Jia Fan <[email protected]>
AuthorDate: Sun Apr 28 10:34:28 2024 +0800
[Improve] Improve doris stream load client side error message (#6688)
---
.../doris/sink/writer/DorisSinkWriter.java | 33 ++-----------
.../doris/sink/writer/DorisStreamLoad.java | 23 ++++++++-
.../connectors/doris/sink/writer/RecordBuffer.java | 56 ++++++++++++++--------
.../connectors/doris/sink/writer/RecordStream.java | 4 ++
.../e2e/connector/doris/DorisErrorIT.java | 11 +++++
.../fake_source_and_doris_sink_timeout_error.conf | 8 +++-
6 files changed, 83 insertions(+), 52 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 4443d7a0a4..496b91b25b 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -58,7 +58,6 @@ public class DorisSinkWriter
new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS,
LoadStatus.PUBLISH_TIMEOUT));
private long lastCheckpointId;
private DorisStreamLoad dorisStreamLoad;
- volatile boolean loading;
private final DorisConfig dorisConfig;
private final String labelPrefix;
private final LabelGenerator labelGenerator;
@@ -66,7 +65,6 @@ public class DorisSinkWriter
private final DorisSerializer serializer;
private final CatalogTable catalogTable;
private final ScheduledExecutorService scheduledExecutorService;
- private Thread executorThread;
private volatile Exception loadException = null;
public DorisSinkWriter(
@@ -94,7 +92,6 @@ public class DorisSinkWriter
1, new
ThreadFactoryBuilder().setNameFormat("stream-load-check").build());
this.serializer = createSerializer(dorisConfig,
catalogTable.getSeaTunnelRowType());
this.intervalTime = dorisConfig.getCheckInterval();
- this.loading = false;
this.initializeLoad();
}
@@ -123,7 +120,7 @@ public class DorisSinkWriter
@Override
public void write(SeaTunnelRow element) throws IOException {
- checkLoadExceptionAndResetThread();
+ checkLoadException();
byte[] serialize =
serializer.serialize(
dorisConfig.isNeedsUnsupportedTypeCasting()
@@ -154,7 +151,6 @@ public class DorisSinkWriter
private RespContent flush() throws IOException {
// disable exception checker before stop load.
- loading = false;
checkState(dorisStreamLoad != null);
RespContent respContent = dorisStreamLoad.stopLoad();
if (respContent != null &&
!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
@@ -177,7 +173,6 @@ public class DorisSinkWriter
private void startLoad(String label) {
this.dorisStreamLoad.startLoad(label);
- this.loading = true;
}
@Override
@@ -194,37 +189,19 @@ public class DorisSinkWriter
private void checkDone() {
// the load future is done and checked in prepareCommit().
// this will check error while loading.
+ String errorMsg;
log.debug("start timer checker, interval {} ms", intervalTime);
- if (dorisStreamLoad.getPendingLoadFuture() != null
- && dorisStreamLoad.getPendingLoadFuture().isDone()) {
- if (!loading) {
- log.debug("not loading, skip timer checker");
- return;
- }
- String errorMsg;
- try {
- RespContent content =
- dorisStreamLoad.handlePreCommitResponse(
- dorisStreamLoad.getPendingLoadFuture().get());
- errorMsg = content.getMessage();
- } catch (Exception e) {
- errorMsg = e.getMessage();
- }
-
+ if ((errorMsg = dorisStreamLoad.getLoadFailedMsg()) != null) {
+ log.error("stream load finished unexpectedly: {}", errorMsg);
loadException =
new DorisConnectorException(
DorisConnectorErrorCode.STREAM_LOAD_FAILED,
errorMsg);
- log.error("stream load finished unexpectedly, interrupt worker
thread! {}", errorMsg);
- // set the executor thread interrupted in case blocking in write
data.
- executorThread.interrupt();
}
}
- private void checkLoadExceptionAndResetThread() {
+ private void checkLoadException() {
if (loadException != null) {
throw new RuntimeException("error while loading data.",
loadException);
- } else {
- executorThread = Thread.currentThread();
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index bf2136091d..eadcf94cd5 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -78,6 +78,7 @@ public class DorisStreamLoad implements Serializable {
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private volatile boolean loadBatchFirstRecord;
+ private volatile boolean loading = false;
private String label;
private long recordCount = 0;
@@ -199,7 +200,25 @@ public class DorisStreamLoad implements Serializable {
return recordCount;
}
- public RespContent handlePreCommitResponse(CloseableHttpResponse response)
throws Exception {
+ public String getLoadFailedMsg() {
+ if (!loading) {
+ return null;
+ }
+ if (this.getPendingLoadFuture() != null &&
this.getPendingLoadFuture().isDone()) {
+ String errorMessage;
+ try {
+ errorMessage =
handlePreCommitResponse(pendingLoadFuture.get()).getMessage();
+ } catch (Exception e) {
+ errorMessage = e.getMessage();
+ }
+ recordStream.setErrorMessageByStreamLoad(errorMessage);
+ return errorMessage;
+ } else {
+ return null;
+ }
+ }
+
+ private RespContent handlePreCommitResponse(CloseableHttpResponse
response) throws Exception {
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity() !=
null) {
String loadResult = EntityUtils.toString(response.getEntity());
@@ -211,6 +230,7 @@ public class DorisStreamLoad implements Serializable {
}
public RespContent stopLoad() throws IOException {
+ loading = false;
if (pendingLoadFuture != null) {
log.info("stream load stopped.");
recordStream.endInput();
@@ -230,6 +250,7 @@ public class DorisStreamLoad implements Serializable {
loadBatchFirstRecord = true;
recordCount = 0;
this.label = label;
+ this.loading = true;
}
private void startStreamLoad() {
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
index a7043886ca..183227fa6c 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.doris.sink.writer;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
+
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -25,18 +29,21 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
/** Channel of record stream and HTTP data stream. */
@Slf4j
public class RecordBuffer {
- BlockingQueue<ByteBuffer> writeQueue;
- BlockingQueue<ByteBuffer> readQueue;
- int bufferCapacity;
- int queueSize;
- ByteBuffer currentWriteBuffer;
- ByteBuffer currentReadBuffer;
+ private final BlockingQueue<ByteBuffer> writeQueue;
+ private final BlockingQueue<ByteBuffer> readQueue;
+ private final int bufferCapacity;
+ private final int queueSize;
+ private ByteBuffer currentWriteBuffer;
+ private ByteBuffer currentReadBuffer;
+ // used to check stream load error by stream load thread
+ @Setter private volatile String errorMessageByStreamLoad;
public RecordBuffer(int capacity, int queueSize) {
log.info("init RecordBuffer capacity {}, count {}", capacity,
queueSize);
@@ -76,7 +83,11 @@ public class RecordBuffer {
currentWriteBuffer = null;
}
if (!isEmpty) {
- ByteBuffer byteBuffer = writeQueue.take();
+ ByteBuffer byteBuffer = null;
+ while (byteBuffer == null) {
+ checkErrorMessageByStreamLoad();
+ byteBuffer = writeQueue.poll(100, TimeUnit.MILLISECONDS);
+ }
((Buffer) byteBuffer).flip();
checkState(byteBuffer.limit() == 0);
readQueue.put(byteBuffer);
@@ -89,8 +100,9 @@ public class RecordBuffer {
public void write(byte[] buf) throws InterruptedException {
int wPos = 0;
do {
- if (currentWriteBuffer == null) {
- currentWriteBuffer = writeQueue.take();
+ while (currentWriteBuffer == null) {
+ checkErrorMessageByStreamLoad();
+ currentWriteBuffer = writeQueue.poll(100,
TimeUnit.MILLISECONDS);
}
int available = currentWriteBuffer.remaining();
int nWrite = Math.min(available, buf.length - wPos);
@@ -105,14 +117,15 @@ public class RecordBuffer {
}
public int read(byte[] buf) throws InterruptedException {
- if (currentReadBuffer == null) {
- currentReadBuffer = readQueue.take();
+ while (currentReadBuffer == null) {
+ checkErrorMessageByStreamLoad();
+ currentReadBuffer = readQueue.poll(100, TimeUnit.MILLISECONDS);
}
// add empty buffer as end flag
if (currentReadBuffer.limit() == 0) {
recycleBuffer(currentReadBuffer);
currentReadBuffer = null;
- checkState(readQueue.size() == 0);
+ checkState(readQueue.isEmpty());
return -1;
}
int available = currentReadBuffer.remaining();
@@ -125,16 +138,17 @@ public class RecordBuffer {
return nRead;
}
- private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
- ((Buffer) buffer).clear();
- writeQueue.put(buffer);
- }
-
- public int getWriteQueueSize() {
- return writeQueue.size();
+ private void checkErrorMessageByStreamLoad() {
+ if (errorMessageByStreamLoad != null) {
+ throw new DorisConnectorException(
+ DorisConnectorErrorCode.STREAM_LOAD_FAILED,
errorMessageByStreamLoad);
+ }
}
- public int getReadQueueSize() {
- return readQueue.size();
+ private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
+ ((Buffer) buffer).clear();
+ while (!writeQueue.offer(buffer, 100, TimeUnit.MILLISECONDS)) {
+ checkErrorMessageByStreamLoad();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
index 73d33e3dd1..01d373e1fa 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
@@ -57,4 +57,8 @@ public class RecordStream extends InputStream {
throw new RuntimeException(e);
}
}
+
+ public void setErrorMessageByStreamLoad(String errorMessageByStreamLoad) {
+ recordBuffer.setErrorMessageByStreamLoad(errorMessageByStreamLoad);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
index 77936f230b..be0416d79e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
@@ -79,6 +79,17 @@ public class DorisErrorIT extends AbstractDorisIT {
Thread.sleep(10 * 1000);
super.container.stop();
Assertions.assertNotEquals(0, future.get().getExitCode());
+ Assertions.assertTrue(
+ future.get()
+ .getStderr()
+ .contains(
+ "Caused by:
org.apache.seatunnel.connectors.doris.exception.DorisConnectorException:
ErrorCode:[Doris-01], ErrorDescription:[stream load error]"));
+ Assertions.assertTrue(
+ future.get()
+ .getStderr()
+ .contains(
+ "at
org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.checkErrorMessageByStreamLoad"));
+ log.info("doris error log: \n" + future.get().getStderr());
super.container.start();
// wait for the container to restart
given().ignoreExceptions()
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
index 53e0eadaa4..88bd3a92ca 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
@@ -18,13 +18,13 @@
env{
parallelism = 1
job.mode = "BATCH"
+ job.retry.times = 0
}
source{
FakeSource {
- row.num = 100
+ row.num = 1000
split.num = 10
- split.read-interval = 10000
string.length = 1
schema = {
fields {
@@ -58,6 +58,10 @@ sink{
password = ""
table.identifier = "e2e_sink.doris_e2e_table"
sink.enable-2pc = "true"
+ // stuck in get RecordBuffer
+ sink.buffer-size = 2
+ sink.buffer-count = 2
+
sink.label-prefix = "test_json"
doris.config = {
format="json"