This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new bf306f1 [fix] Add log in abort phase (#78)
bf306f1 is described below
commit bf306f19746fb0109e979283cc4bc7d8d7e1a33d
Author: DongLiang-0 <[email protected]>
AuthorDate: Mon Nov 21 10:30:58 2022 +0800
[fix] Add log in abort phase (#78)
* [fix] Add log in abort phase
* fix unit test
---
.../org/apache/doris/flink/sink/LoadStatus.java | 1 +
.../doris/flink/sink/writer/DorisStreamLoad.java | 9 +++----
.../org/apache/doris/flink/sink/HttpTestUtil.java | 5 ++++
.../flink/sink/writer/TestDorisStreamLoad.java | 28 +++++++++++++++++++---
4 files changed, 36 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/LoadStatus.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/LoadStatus.java
index a9b0d75..d1815b8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/LoadStatus.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/LoadStatus.java
@@ -25,4 +25,5 @@ public class LoadStatus {
public static final String PUBLISH_TIMEOUT = "Publish Timeout";
public static final String LABEL_ALREADY_EXIST = "Label Already Exists";
public static final String FAIL = "Fail";
+
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index bd29d34..296c66a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -27,6 +27,8 @@ import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.ResponseUtil;
+
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -49,7 +51,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
-import static org.apache.doris.flink.sink.LoadStatus.FAIL;
import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
@@ -253,7 +254,7 @@ public class DorisStreamLoad implements Serializable {
}
}
- private void abortTransaction(long txnID) throws Exception {
+ public void abortTransaction(long txnID) throws Exception {
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(abortUrlStr)
.baseAuth(user, passwd)
@@ -272,12 +273,12 @@ public class DorisStreamLoad implements Serializable {
ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = mapper.readValue(loadResult, new
TypeReference<HashMap<String, String>>(){});
- if (FAIL.equals(res.get("status"))) {
+ if (!SUCCESS.equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisException("try abort committed transaction, " +
"do you recover from old savepoint?");
}
- LOG.warn("Fail to abort transaction. error: {}", res.get("msg"));
+ LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnID,
res.get("msg"));
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
index 1cf05a7..67e8ba3 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
@@ -76,6 +76,11 @@ public class HttpTestUtil {
"\n" +
"}";
+ public static final String ABORT_FAILED_RESPONSE = "{\n"
+ + " \"status\": \"Internal error\",\n"
+ + " \"msg\": \"transaction operation should be 'commit' or
'abort'\"\n"
+ + "}";
+
public static StatusLine normalLine = new BasicStatusLine(new
ProtocolVersion("http", 1, 0), 200, "");
public static StatusLine abnormalLine = new BasicStatusLine(new
ProtocolVersion("http", 1, 0), 404, "");
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index b4532ee..6a182bd 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -32,7 +32,10 @@ import java.nio.charset.StandardCharsets;
import java.util.Properties;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
@@ -54,13 +57,32 @@ public class TestDorisStreamLoad {
public void testAbortPreCommit() throws Exception {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse existLabelResponse =
HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_RESPONSE, true);
- CloseableHttpResponse abortSuccessResponse =
HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
CloseableHttpResponse preCommitResponse =
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
- when(httpClient.execute(any())).thenReturn(existLabelResponse,
abortSuccessResponse, preCommitResponse);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("",
dorisOptions, executionOptions, new LabelGenerator("test001_0", true),
httpClient);
+ when(httpClient.execute(any())).thenReturn(existLabelResponse,
preCommitResponse);
+ DorisStreamLoad dorisStreamLoad = spy(new DorisStreamLoad("",
dorisOptions, executionOptions, new LabelGenerator("test001_0", true),
httpClient));
+
+ doNothing().when(dorisStreamLoad).abortTransaction(anyLong());
dorisStreamLoad.abortPreCommit("test001_0", 1);
}
+ @Test
+ public void testAbortTransaction() throws Exception{
+ CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+ CloseableHttpResponse abortSuccessResponse =
HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
+ when(httpClient.execute(any())).thenReturn(abortSuccessResponse);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("",
dorisOptions, executionOptions, new LabelGenerator("test001_0", true),
httpClient);
+ dorisStreamLoad.abortTransaction(anyLong());
+ }
+
+ @Test
+ public void testAbortTransactionFailed() throws Exception{
+ CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+ CloseableHttpResponse abortFailedResponse =
HttpTestUtil.getResponse(HttpTestUtil.ABORT_FAILED_RESPONSE, true);
+ when(httpClient.execute(any())).thenReturn(abortFailedResponse);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("",
dorisOptions, executionOptions, new LabelGenerator("test001_0", true),
httpClient);
+ dorisStreamLoad.abortTransaction(anyLong());
+ }
+
@Test
public void testWriteOneRecordInCsv() throws Exception{
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]