This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e10a47cb [Fix] prevent label setting during group commit retries (#635)
e10a47cb is described below

commit e10a47cb118a67fe7db7c51f47bb488b3dafc97e
Author: Zach <[email protected]>
AuthorDate: Fri Feb 27 16:55:57 2026 +0800

    [Fix] prevent label setting during group commit retries (#635)
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     |  4 +-
 .../flink/sink/batch/TestDorisBatchStreamLoad.java | 75 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index ee1d9be6..4bc0c19d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -547,7 +547,9 @@ public class DorisBatchStreamLoad implements Serializable {
                 // get available backend retry
                 refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
                 putBuilder.setUrl(loadUrl);
-                putBuilder.setLabel(label + "_" + retry);
+                if (!enableGroupCommit && label != null) {
+                    putBuilder.setLabel(label + "_" + retry);
+                }
 
                 try {
                     Thread.sleep(retry * 1000);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index a7998397..f45e69ac 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -28,6 +28,7 @@ import org.apache.doris.flink.sink.TestUtil;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.doris.flink.sink.writer.LoadConstants;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.junit.After;
@@ -38,6 +39,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runners.MethodSorters;
+import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +52,7 @@ import java.util.Properties;
 
 import static 
org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
@@ -175,6 +178,78 @@ public class TestDorisBatchStreamLoad {
         loader.checkpointFlush();
     }
 
+    @Test
+    public void testGroupCommitRetryShouldNotSetLabel() throws Exception {
+        LOG.info("testGroupCommitRetryShouldNotSetLabel start");
+        DorisReadOptions readOptions = DorisReadOptions.builder().build();
+        Properties streamProperties = new Properties();
+        streamProperties.setProperty(LoadConstants.GROUP_COMMIT, "sync_mode");
+        DorisExecutionOptions executionOptions =
+                DorisExecutionOptions.builder()
+                        .setBufferFlushIntervalMs(1000)
+                        .setMaxRetries(1)
+                        .setStreamLoadProp(streamProperties)
+                        .build();
+        DorisOptions options =
+                DorisOptions.builder()
+                        .setFenodes("127.0.0.1:1")
+                        .setBenodes("127.0.0.1:1")
+                        .setTableIdentifier("db.tbl")
+                        .build();
+
+        DorisBatchStreamLoad loader =
+                new DorisBatchStreamLoad(
+                        options,
+                        readOptions,
+                        executionOptions,
+                        new LabelGenerator("label", false),
+                        0);
+
+        try {
+            TestUtil.waitUntilCondition(
+                    () -> loader.isLoadThreadAlive(),
+                    Deadline.fromNow(Duration.ofSeconds(10)),
+                    100L,
+                    "testGroupCommitRetryShouldNotSetLabel wait loader start 
failed.");
+            Assert.assertTrue(loader.isLoadThreadAlive());
+
+            BackendUtil backendUtil = mock(BackendUtil.class);
+            HttpClientBuilder httpClientBuilder = 
mock(HttpClientBuilder.class);
+            CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+            CloseableHttpResponse failResponse =
+                    HttpTestUtil.getResponse("server error 404", false);
+            CloseableHttpResponse successResponse =
+                    
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true);
+            ArgumentCaptor<HttpUriRequest> requestCaptor =
+                    ArgumentCaptor.forClass(HttpUriRequest.class);
+
+            loader.setBackendUtil(backendUtil);
+            loader.setHttpClientBuilder(httpClientBuilder);
+            
when(backendUtil.getAvailableBackend(anyInt())).thenReturn("127.0.0.1:1");
+            when(httpClientBuilder.build()).thenReturn(httpClient);
+            when(httpClient.execute(requestCaptor.capture()))
+                    .thenReturn(failResponse, successResponse);
+
+            loader.writeRecord("db", "tbl", 
"1,data".getBytes(StandardCharsets.UTF_8));
+            loader.checkpointFlush();
+
+            List<HttpUriRequest> requests = requestCaptor.getAllValues();
+            Assert.assertEquals(2, requests.size());
+            Assert.assertNull(requests.get(0).getFirstHeader("label"));
+            Assert.assertNull(requests.get(1).getFirstHeader("label"));
+            
Assert.assertNotNull(requests.get(0).getFirstHeader(LoadConstants.GROUP_COMMIT));
+            
Assert.assertNotNull(requests.get(1).getFirstHeader(LoadConstants.GROUP_COMMIT));
+            Assert.assertEquals(
+                    "sync_mode",
+                    
requests.get(0).getFirstHeader(LoadConstants.GROUP_COMMIT).getValue());
+            Assert.assertEquals(
+                    "sync_mode",
+                    
requests.get(1).getFirstHeader(LoadConstants.GROUP_COMMIT).getValue());
+        } finally {
+            loader.close();
+        }
+    }
+
     @After
     public void after() {
         if (backendUtilMockedStatic != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to