This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new bf306f1  [fix] Add log in abort phase (#78)
bf306f1 is described below

commit bf306f19746fb0109e979283cc4bc7d8d7e1a33d
Author: DongLiang-0 <[email protected]>
AuthorDate: Mon Nov 21 10:30:58 2022 +0800

    [fix] Add log in abort phase (#78)
    
    * [fix] Add log in abort phase
    * fix unit test
---
 .../org/apache/doris/flink/sink/LoadStatus.java    |  1 +
 .../doris/flink/sink/writer/DorisStreamLoad.java   |  9 +++----
 .../org/apache/doris/flink/sink/HttpTestUtil.java  |  5 ++++
 .../flink/sink/writer/TestDorisStreamLoad.java     | 28 +++++++++++++++++++---
 4 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/LoadStatus.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/LoadStatus.java
index a9b0d75..d1815b8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/LoadStatus.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/LoadStatus.java
@@ -25,4 +25,5 @@ public class LoadStatus {
     public static final String PUBLISH_TIMEOUT = "Publish Timeout";
     public static final String LABEL_ALREADY_EXIST = "Label Already Exists";
     public static final String FAIL = "Fail";
+
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index bd29d34..296c66a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -27,6 +27,8 @@ import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.ResponseUtil;
+
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -49,7 +51,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 
-import static org.apache.doris.flink.sink.LoadStatus.FAIL;
 import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
 import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
@@ -253,7 +254,7 @@ public class DorisStreamLoad implements Serializable {
         }
     }
 
-    private void abortTransaction(long txnID) throws Exception {
+    public void abortTransaction(long txnID) throws Exception {
         HttpPutBuilder builder = new HttpPutBuilder();
         builder.setUrl(abortUrlStr)
                 .baseAuth(user, passwd)
@@ -272,12 +273,12 @@ public class DorisStreamLoad implements Serializable {
         ObjectMapper mapper = new ObjectMapper();
         String loadResult = EntityUtils.toString(response.getEntity());
         Map<String, String> res = mapper.readValue(loadResult, new 
TypeReference<HashMap<String, String>>(){});
-        if (FAIL.equals(res.get("status"))) {
+        if (!SUCCESS.equals(res.get("status"))) {
             if (ResponseUtil.isCommitted(res.get("msg"))) {
                 throw new DorisException("try abort committed transaction, " +
                         "do you recover from old savepoint?");
             }
-            LOG.warn("Fail to abort transaction. error: {}", res.get("msg"));
+            LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnID, 
res.get("msg"));
         }
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
index 1cf05a7..67e8ba3 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
@@ -76,6 +76,11 @@ public class HttpTestUtil {
             "\n" +
             "}";
 
+    public static final String ABORT_FAILED_RESPONSE = "{\n"
+            + "    \"status\": \"Internal error\",\n"
+            + "    \"msg\": \"transaction operation should be 'commit' or 
'abort'\"\n"
+            + "}";
+
     public static StatusLine normalLine = new BasicStatusLine(new 
ProtocolVersion("http", 1, 0), 200, "");
     public static StatusLine abnormalLine = new BasicStatusLine(new 
ProtocolVersion("http", 1, 0), 404, "");
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index b4532ee..6a182bd 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -32,7 +32,10 @@ import java.nio.charset.StandardCharsets;
 import java.util.Properties;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 /**
@@ -54,13 +57,32 @@ public class TestDorisStreamLoad {
     public void testAbortPreCommit() throws Exception {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse existLabelResponse = 
HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_RESPONSE, true);
-        CloseableHttpResponse abortSuccessResponse = 
HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
         CloseableHttpResponse preCommitResponse = 
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
-        when(httpClient.execute(any())).thenReturn(existLabelResponse, 
abortSuccessResponse, preCommitResponse);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("test001_0", true), 
httpClient);
+        when(httpClient.execute(any())).thenReturn(existLabelResponse, 
preCommitResponse);
+        DorisStreamLoad dorisStreamLoad = spy(new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("test001_0", true), 
httpClient));
+
+        doNothing().when(dorisStreamLoad).abortTransaction(anyLong());
         dorisStreamLoad.abortPreCommit("test001_0", 1);
     }
 
+    @Test
+    public void  testAbortTransaction() throws Exception{
+        CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+        CloseableHttpResponse abortSuccessResponse = 
HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
+        when(httpClient.execute(any())).thenReturn(abortSuccessResponse);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("test001_0", true), 
httpClient);
+        dorisStreamLoad.abortTransaction(anyLong());
+    }
+
+    @Test
+    public void  testAbortTransactionFailed() throws Exception{
+        CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+        CloseableHttpResponse abortFailedResponse = 
HttpTestUtil.getResponse(HttpTestUtil.ABORT_FAILED_RESPONSE, true);
+        when(httpClient.execute(any())).thenReturn(abortFailedResponse);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("test001_0", true), 
httpClient);
+        dorisStreamLoad.abortTransaction(anyLong());
+    }
+
     @Test
     public void testWriteOneRecordInCsv() throws Exception{
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to