This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e10a47cb [Fix] prevent label setting during group commit retries (#635)
e10a47cb is described below
commit e10a47cb118a67fe7db7c51f47bb488b3dafc97e
Author: Zach <[email protected]>
AuthorDate: Fri Feb 27 16:55:57 2026 +0800
[Fix] prevent label setting during group commit retries (#635)
---
.../flink/sink/batch/DorisBatchStreamLoad.java | 4 +-
.../flink/sink/batch/TestDorisBatchStreamLoad.java | 75 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index ee1d9be6..4bc0c19d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -547,7 +547,9 @@ public class DorisBatchStreamLoad implements Serializable {
// get available backend retry
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
putBuilder.setUrl(loadUrl);
- putBuilder.setLabel(label + "_" + retry);
+ if (!enableGroupCommit && label != null) {
+ putBuilder.setLabel(label + "_" + retry);
+ }
try {
Thread.sleep(retry * 1000);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index a7998397..f45e69ac 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -28,6 +28,7 @@ import org.apache.doris.flink.sink.TestUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.After;
@@ -38,6 +39,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runners.MethodSorters;
+import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +52,7 @@ import java.util.Properties;
import static
org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
@@ -175,6 +178,78 @@ public class TestDorisBatchStreamLoad {
loader.checkpointFlush();
}
+ @Test
+ public void testGroupCommitRetryShouldNotSetLabel() throws Exception {
+ LOG.info("testGroupCommitRetryShouldNotSetLabel start");
+ DorisReadOptions readOptions = DorisReadOptions.builder().build();
+ Properties streamProperties = new Properties();
+ streamProperties.setProperty(LoadConstants.GROUP_COMMIT, "sync_mode");
+ DorisExecutionOptions executionOptions =
+ DorisExecutionOptions.builder()
+ .setBufferFlushIntervalMs(1000)
+ .setMaxRetries(1)
+ .setStreamLoadProp(streamProperties)
+ .build();
+ DorisOptions options =
+ DorisOptions.builder()
+ .setFenodes("127.0.0.1:1")
+ .setBenodes("127.0.0.1:1")
+ .setTableIdentifier("db.tbl")
+ .build();
+
+ DorisBatchStreamLoad loader =
+ new DorisBatchStreamLoad(
+ options,
+ readOptions,
+ executionOptions,
+ new LabelGenerator("label", false),
+ 0);
+
+ try {
+ TestUtil.waitUntilCondition(
+ () -> loader.isLoadThreadAlive(),
+ Deadline.fromNow(Duration.ofSeconds(10)),
+ 100L,
+ "testGroupCommitRetryShouldNotSetLabel wait loader start
failed.");
+ Assert.assertTrue(loader.isLoadThreadAlive());
+
+ BackendUtil backendUtil = mock(BackendUtil.class);
+ HttpClientBuilder httpClientBuilder =
mock(HttpClientBuilder.class);
+ CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+ CloseableHttpResponse failResponse =
+ HttpTestUtil.getResponse("server error 404", false);
+ CloseableHttpResponse successResponse =
+
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true);
+ ArgumentCaptor<HttpUriRequest> requestCaptor =
+ ArgumentCaptor.forClass(HttpUriRequest.class);
+
+ loader.setBackendUtil(backendUtil);
+ loader.setHttpClientBuilder(httpClientBuilder);
+
when(backendUtil.getAvailableBackend(anyInt())).thenReturn("127.0.0.1:1");
+ when(httpClientBuilder.build()).thenReturn(httpClient);
+ when(httpClient.execute(requestCaptor.capture()))
+ .thenReturn(failResponse, successResponse);
+
+ loader.writeRecord("db", "tbl",
"1,data".getBytes(StandardCharsets.UTF_8));
+ loader.checkpointFlush();
+
+ List<HttpUriRequest> requests = requestCaptor.getAllValues();
+ Assert.assertEquals(2, requests.size());
+ Assert.assertNull(requests.get(0).getFirstHeader("label"));
+ Assert.assertNull(requests.get(1).getFirstHeader("label"));
+
Assert.assertNotNull(requests.get(0).getFirstHeader(LoadConstants.GROUP_COMMIT));
+
Assert.assertNotNull(requests.get(1).getFirstHeader(LoadConstants.GROUP_COMMIT));
+ Assert.assertEquals(
+ "sync_mode",
+
requests.get(0).getFirstHeader(LoadConstants.GROUP_COMMIT).getValue());
+ Assert.assertEquals(
+ "sync_mode",
+
requests.get(1).getFirstHeader(LoadConstants.GROUP_COMMIT).getValue());
+ } finally {
+ loader.close();
+ }
+ }
+
@After
public void after() {
if (backendUtilMockedStatic != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]