This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2c5b48e907 [Improve] Add batch flush in doris sink (#6024)
2c5b48e907 is described below

commit 2c5b48e9078718cb31464e850f2a62da137c8b4d
Author: Jia Fan <[email protected]>
AuthorDate: Sat Dec 23 10:35:46 2023 +0800

    [Improve] Add batch flush in doris sink (#6024)
---
 docs/en/connector-v2/sink/Doris.md                 |   1 +
 .../connectors/doris/config/DorisConfig.java       |   2 +-
 .../connectors/doris/config/DorisOptions.java      |  25 ++--
 .../connectors/doris/config/DorisSinkFactory.java  |  48 --------
 .../connectors/doris/rest/RestService.java         |   3 +-
 .../seatunnel/connectors/doris/sink/DorisSink.java |  81 +------------
 .../doris/sink/writer/DorisSinkWriter.java         |  63 +++++-----
 .../doris/sink/writer/DorisStreamLoad.java         |   9 +-
 .../doris/sink/writer/LabelGenerator.java          |   4 +-
 .../connectors/doris/sink/writer/RecordBuffer.java |   2 +-
 .../e2e/connector/doris/AbstractDorisIT.java       | 132 +++++++++++++++++++++
 .../e2e/connector/doris/DorisCDCSinkIT.java        |  85 +------------
 .../e2e/connector/doris/DorisCatalogIT.java        | 103 +---------------
 13 files changed, 198 insertions(+), 360 deletions(-)

diff --git a/docs/en/connector-v2/sink/Doris.md 
b/docs/en/connector-v2/sink/Doris.md
index 6bf8dc5369..de0c47453a 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -45,6 +45,7 @@ Version Supported
 | sink.max-retries    | int    | No       | 3          | the max retry times 
if writing records to database failed                                           
                                                                                
                                                                                
                         |
 | sink.buffer-size    | int    | No       | 256 * 1024 | the buffer size to 
cache data for stream load.                                                     
                                                                                
                                                                                
                          |
 | sink.buffer-count   | int    | No       | 3          | the buffer count to 
cache data for stream load.                                                     
                                                                                
                                                                                
                         |
+| doris.batch.size    | int    | No       | 1024       | the batch size of the 
write to doris each http request, when the row reaches the size or checkpoint 
is executed, the data of cached will write to server.                           
                                                                                
                         |
 | doris.config        | map    | yes      | -          | This option is used 
to support operations such as `insert`, `delete`, and `update` when 
automatically generate sql,and supported formats.                               
                                                                                
                                     |
 
 ## Data Type Mapping
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
index 5aa3ce13ff..98a43c7b8e 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
@@ -65,6 +65,7 @@ public class DorisConfig implements Serializable {
     private String password;
     private Integer queryPort;
     private String tableIdentifier;
+    private int batchSize;
 
     // source option
     private String readField;
@@ -76,7 +77,6 @@ public class DorisConfig implements Serializable {
     private Integer requestRetries;
     private boolean deserializeArrowAsync;
     private int deserializeQueueSize;
-    private int batchSize;
     private int execMemLimit;
     private boolean useOldApi;
 
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
index 270cc03b21..04a4f02851 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
@@ -90,6 +90,11 @@ public interface DorisOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("the doris password.");
+    Option<Integer> DORIS_BATCH_SIZE =
+            Options.key("doris.batch.size")
+                    .intType()
+                    .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+                    .withDescription("the batch size of the doris 
read/write.");
 
     // source config options
     Option<String> DORIS_READ_FIELD =
@@ -139,22 +144,6 @@ public interface DorisOptions {
                     .intType()
                     .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
                     .withDescription("");
-    Option<Integer> DORIS_BATCH_SIZE =
-            Options.key("doris.batch.size")
-                    .intType()
-                    .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
-                    .withDescription("");
-    Option<Long> DORIS_EXEC_MEM_LIMIT =
-            Options.key("doris.exec.mem.limit")
-                    .longType()
-                    .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
-                    .withDescription("");
-    Option<Boolean> SOURCE_USE_OLD_API =
-            Options.key("source.use-old-api")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Whether to read data using the new interface 
defined according to the FLIP-27 specification,default false");
 
     // sink config options
     Option<Boolean> SINK_ENABLE_2PC =
@@ -224,7 +213,9 @@ public interface DorisOptions {
                     .withDescription("Create table statement template, used to 
create Doris table");
 
     OptionRule.Builder SINK_RULE =
-            OptionRule.builder().required(FENODES, USERNAME, PASSWORD, 
TABLE_IDENTIFIER);
+            OptionRule.builder()
+                    .required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER)
+                    .optional(DORIS_BATCH_SIZE);
 
     OptionRule.Builder CATALOG_RULE =
             OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, 
PASSWORD);
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
deleted file mode 100644
index e3b56c86ac..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.config;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(Factory.class)
-public class DorisSinkFactory implements TableSinkFactory {
-
-    public static final String IDENTIFIER = "Doris";
-
-    @Override
-    public String factoryIdentifier() {
-        return IDENTIFIER;
-    }
-
-    @Override
-    public OptionRule optionRule() {
-        return OptionRule.builder()
-                .required(
-                        DorisOptions.FENODES,
-                        DorisOptions.USERNAME,
-                        DorisOptions.PASSWORD,
-                        DorisOptions.SINK_LABEL_PREFIX,
-                        DorisOptions.DORIS_SINK_CONFIG_PREFIX)
-                .optional(DorisOptions.SINK_ENABLE_2PC, 
DorisOptions.SINK_ENABLE_DELETE)
-                .build();
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index 8af81603e7..e1c83a0700 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -372,8 +372,7 @@ public class RestService implements Serializable {
                 HttpGet httpGet = new HttpGet(beUrl);
                 String response = send(dorisConfig, httpGet, logger);
                 logger.info("Backend Info:{}", response);
-                List<BackendV2.BackendRowV2> backends = 
parseBackendV2(response, logger);
-                return backends;
+                return parseBackendV2(response, logger);
             } catch (DorisConnectorException e) {
                 logger.info(
                         "Doris FE node {} is unavailable: {}, Request the next 
Doris FE node",
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 92dade722f..8a45e34cef 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -17,29 +17,18 @@
 
 package org.apache.seatunnel.connectors.doris.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
-import org.apache.seatunnel.connectors.doris.config.DorisOptions;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
 import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
 import 
org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer;
 import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter;
@@ -47,29 +36,21 @@ import 
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
 import 
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkStateSerializer;
 import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-@AutoService(SeaTunnelSink.class)
 public class DorisSink
         implements SeaTunnelSink<SeaTunnelRow, DorisSinkState, 
DorisCommitInfo, DorisCommitInfo>,
                 SupportSaveMode {
 
-    private DorisConfig dorisConfig;
-    private SeaTunnelRowType seaTunnelRowType;
+    private final DorisConfig dorisConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
     private String jobId;
 
-    private CatalogTable catalogTable;
-
-    public DorisSink() {}
-
     public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) {
         this.dorisConfig = DorisConfig.of(config);
-        this.catalogTable = catalogTable;
         this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
@@ -78,63 +59,22 @@ public class DorisSink
         return "Doris";
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.dorisConfig = DorisConfig.of(pluginConfig);
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        DorisOptions.FENODES.key(),
-                        DorisOptions.USERNAME.key(),
-                        DorisOptions.TABLE_IDENTIFIER.key());
-        if (!result.isSuccess()) {
-            throw new DorisConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        if (dorisConfig.getTableIdentifier().isEmpty() && catalogTable != 
null) {
-            String tableIdentifier =
-                    catalogTable.getTableId().getDatabaseName()
-                            + "."
-                            + catalogTable.getTableId().getTableName();
-            dorisConfig.setTableIdentifier(tableIdentifier);
-        }
-    }
-
     @Override
     public void setJobContext(JobContext jobContext) {
         this.jobId = jobContext.getJobId();
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
-        DorisSinkWriter dorisSinkWriter =
-                new DorisSinkWriter(
-                        context, Collections.emptyList(), seaTunnelRowType, 
dorisConfig, jobId);
-        dorisSinkWriter.initializeLoad(Collections.emptyList());
-        return dorisSinkWriter;
+        return new DorisSinkWriter(
+                context, Collections.emptyList(), seaTunnelRowType, 
dorisConfig, jobId);
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> 
restoreWriter(
             SinkWriter.Context context, List<DorisSinkState> states) throws 
IOException {
-        DorisSinkWriter dorisWriter =
-                new DorisSinkWriter(context, states, seaTunnelRowType, 
dorisConfig, jobId);
-        dorisWriter.initializeLoad(states);
-        return dorisWriter;
+        return new DorisSinkWriter(context, states, seaTunnelRowType, 
dorisConfig, jobId);
     }
 
     @Override
@@ -152,17 +92,6 @@ public class DorisSink
         return Optional.of(new DorisCommitInfoSerializer());
     }
 
-    @Override
-    public Optional<SinkAggregatedCommitter<DorisCommitInfo, DorisCommitInfo>>
-            createAggregatedCommitter() throws IOException {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<Serializer<DorisCommitInfo>> 
getAggregatedCommitInfoSerializer() {
-        return Optional.empty();
-    }
-
     @Override
     public Optional<SaveModeHandler> getSaveModeHandler() {
         return Optional.empty();
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 8f0a63d289..2f35357e5b 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.connectors.doris.util.HttpUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -64,7 +65,6 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
     private final String labelPrefix;
     private final LabelGenerator labelGenerator;
     private final int intervalTime;
-    private final DorisSinkState dorisSinkState;
     private final DorisSerializer serializer;
     private final transient ScheduledExecutorService scheduledExecutorService;
     private transient Thread executorThread;
@@ -77,12 +77,12 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
             List<DorisSinkState> state,
             SeaTunnelRowType seaTunnelRowType,
             DorisConfig dorisConfig,
-            String jobId) {
+            String jobId)
+            throws IOException {
         this.dorisConfig = dorisConfig;
-        this.lastCheckpointId = state.size() != 0 ? 
state.get(0).getCheckpointId() : 0;
+        this.lastCheckpointId = !state.isEmpty() ? 
state.get(0).getCheckpointId() : 0;
         log.info("restore checkpointId {}", lastCheckpointId);
         log.info("labelPrefix " + dorisConfig.getLabelPrefix());
-        this.dorisSinkState = new DorisSinkState(dorisConfig.getLabelPrefix(), 
lastCheckpointId);
         this.labelPrefix =
                 dorisConfig.getLabelPrefix() + "_" + jobId + "_" + 
context.getIndexOfSubtask();
         this.labelGenerator = new LabelGenerator(labelPrefix, 
dorisConfig.getEnable2PC());
@@ -92,9 +92,10 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
         this.serializer = createSerializer(dorisConfig, seaTunnelRowType);
         this.intervalTime = dorisConfig.getCheckInterval();
         this.loading = false;
+        this.initializeLoad();
     }
 
-    public void initializeLoad(List<DorisSinkState> state) throws IOException {
+    private void initializeLoad() throws IOException {
         this.backends = RestService.getBackendsV2(dorisConfig, log);
         String backend = getAvailableBackend();
         try {
@@ -124,10 +125,26 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
             return;
         }
         dorisStreamLoad.writeRecord(serialize);
+        if (!dorisConfig.getEnable2PC()
+                && dorisStreamLoad.getRecordCount() >= 
dorisConfig.getBatchSize()) {
+            flush();
+            startLoad(labelGenerator.generateLabel(lastCheckpointId));
+        }
     }
 
     @Override
     public Optional<DorisCommitInfo> prepareCommit() throws IOException {
+        RespContent respContent = flush();
+        if (!dorisConfig.getEnable2PC()) {
+            return Optional.empty();
+        }
+        long txnId = respContent.getTxnId();
+
+        return Optional.of(
+                new DorisCommitInfo(dorisStreamLoad.getHostPort(), 
dorisStreamLoad.getDb(), txnId));
+    }
+
+    @NonNull private RespContent flush() throws IOException {
         // disable exception checker before stop load.
         loading = false;
         checkState(dorisStreamLoad != null);
@@ -139,25 +156,23 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
                             respContent.getMessage(), 
respContent.getErrorURL());
             throw new 
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, errMsg);
         }
-        if (!dorisConfig.getEnable2PC()) {
-            return Optional.empty();
-        }
-        long txnId = respContent.getTxnId();
-
-        return Optional.of(
-                new DorisCommitInfo(dorisStreamLoad.getHostPort(), 
dorisStreamLoad.getDb(), txnId));
+        return respContent;
     }
 
     @Override
     public List<DorisSinkState> snapshotState(long checkpointId) throws 
IOException {
         checkState(dorisStreamLoad != null);
-        this.dorisStreamLoad.setHostPort(getAvailableBackend());
-        
this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
-        this.loading = true;
+        startLoad(labelGenerator.generateLabel(checkpointId + 1));
         this.lastCheckpointId = checkpointId;
         return Collections.singletonList(new DorisSinkState(labelPrefix, 
lastCheckpointId));
     }
 
+    private void startLoad(String label) throws IOException {
+        this.dorisStreamLoad.setHostPort(getAvailableBackend());
+        this.dorisStreamLoad.startLoad(label);
+        this.loading = true;
+    }
+
     @Override
     public void abortPrepare() {
         if (dorisConfig.getEnable2PC()) {
@@ -204,23 +219,11 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
         }
     }
 
-    @VisibleForTesting
-    public boolean isLoading() {
-        return this.loading;
-    }
-
-    @VisibleForTesting
-    public void setDorisStreamLoad(DorisStreamLoad streamLoad) {
-        this.dorisStreamLoad = streamLoad;
-    }
-
-    @VisibleForTesting
-    public void setBackends(List<BackendV2.BackendRowV2> backends) {
-        this.backends = backends;
-    }
-
     @Override
     public void close() throws IOException {
+        if (!dorisConfig.getEnable2PC()) {
+            flush();
+        }
         if (scheduledExecutorService != null) {
             scheduledExecutorService.shutdownNow();
         }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index 696d5c4095..892c1da954 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -32,7 +32,6 @@ import org.apache.http.util.EntityUtils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 
@@ -79,6 +78,7 @@ public class DorisStreamLoad implements Serializable {
     private final CloseableHttpClient httpClient;
     private final ExecutorService executorService;
     private boolean loadBatchFirstRecord;
+    private long recordCount = 0;
 
     public DorisStreamLoad(
             String hostPort,
@@ -194,11 +194,11 @@ public class DorisStreamLoad implements Serializable {
             recordStream.write(lineDelimiter);
         }
         recordStream.write(record);
+        recordCount++;
     }
 
-    @VisibleForTesting
-    public RecordStream getRecordStream() {
-        return recordStream;
+    public long getRecordCount() {
+        return recordCount;
     }
 
     public RespContent handlePreCommitResponse(CloseableHttpResponse response) 
throws Exception {
@@ -225,6 +225,7 @@ public class DorisStreamLoad implements Serializable {
 
     public void startLoad(String label) throws IOException {
         loadBatchFirstRecord = true;
+        recordCount = 0;
         HttpPutBuilder putBuilder = new HttpPutBuilder();
         recordStream.startInput();
         log.info("stream load started for {}", label);
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
index 1f4fdbb45a..df13d9b22a 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.connectors.doris.sink.writer;
 
 /** Generator label for stream load. */
 public class LabelGenerator {
-    private String labelPrefix;
-    private boolean enable2PC;
+    private final String labelPrefix;
+    private final boolean enable2PC;
 
     public LabelGenerator(String labelPrefix, boolean enable2PC) {
         this.labelPrefix = labelPrefix;
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
index 7c861658f8..26125e96ee 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
@@ -55,7 +55,7 @@ public class RecordBuffer {
                 "start buffer data, read queue size {}, write queue size {}",
                 readQueue.size(),
                 writeQueue.size());
-        checkState(readQueue.size() == 0);
+        checkState(readQueue.isEmpty());
         checkState(writeQueue.size() == queueSize);
         for (ByteBuffer byteBuffer : writeQueue) {
             checkState(byteBuffer.position() == 0);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
new file mode 100644
index 0000000000..229cb04c79
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.doris;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+public abstract class AbstractDorisIT extends TestSuiteBase implements 
TestResource {
+
+    protected GenericContainer<?> container;
+
+    // use image adamlee489/doris:1.2.7.1_arm when running this test on mac
+    private static final String DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86";
+    protected static final String HOST = "doris_e2e";
+    protected static final int QUERY_PORT = 9030;
+    protected static final int HTTP_PORT = 8030;
+    protected static final int BE_HTTP_PORT = 8040;
+    protected static final String URL = "jdbc:mysql://%s:" + QUERY_PORT;
+    protected static final String USERNAME = "root";
+    protected static final String PASSWORD = "";
+    protected Connection jdbcConnection;
+    private static final String SET_SQL =
+            "ADMIN SET FRONTEND CONFIG (\"enable_batch_delete_by_default\" = 
\"true\")";
+    private static final String SHOW_BE = "SHOW BACKENDS";
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        container =
+                new GenericContainer<>(DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HOST)
+                        .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
+                        .withEnv("FE_ID", "1")
+                        .withEnv("CURRENT_BE_IP", "127.0.0.1")
+                        .withEnv("CURRENT_BE_PORT", "9050")
+                        .withCommand("ulimit -n 65536")
+                        .withCreateContainerCmdModifier(
+                                cmd -> cmd.getHostConfig().withMemorySwap(0L))
+                        .withPrivilegedMode(true)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
+        container.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%s:%s", QUERY_PORT, QUERY_PORT),
+                        String.format("%s:%s", HTTP_PORT, HTTP_PORT),
+                        String.format("%s:%s", BE_HTTP_PORT, BE_HTTP_PORT)));
+
+        Startables.deepStart(Stream.of(container)).join();
+        log.info("doris container started");
+        given().ignoreExceptions()
+                .await()
+                .atMost(10000, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        Properties props = new Properties();
+        props.put("user", USERNAME);
+        props.put("password", PASSWORD);
+
+        jdbcConnection =
+                DriverManager.getConnection(String.format(URL, 
container.getHost()), props);
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute(SET_SQL);
+            ResultSet resultSet;
+            do {
+                resultSet = statement.executeQuery(SHOW_BE);
+            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+        }
+    }
+
+    private boolean isBeReady(ResultSet rs, Duration duration) throws 
SQLException {
+        if (rs.next()) {
+            String isAlive = rs.getString(10).trim();
+            String totalCap = rs.getString(16).trim();
+            LockSupport.parkNanos(duration.toNanos());
+            return "true".equalsIgnoreCase(isAlive) && 
!"0.000".equalsIgnoreCase(totalCap);
+        }
+        return false;
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (container != null) {
+            container.close();
+        }
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
index 8983269a19..9afa91d4e8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
@@ -17,60 +17,32 @@
 
 package org.apache.seatunnel.e2e.connector.doris;
 
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 
-import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerLoggerFactory;
 
-import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.sql.Connection;
-import java.sql.Driver;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.awaitility.Awaitility.given;
-
 @Slf4j
-@Disabled
-public class DorisCDCSinkIT extends TestSuiteBase implements TestResource {
-    private static final String DOCKER_IMAGE = 
"zykkk/doris:1.2.2.1-avx2-x86_84";
-    private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
-    private static final String HOST = "doris_cdc_e2e";
-    private static final int DOCKER_PORT = 9030;
-    private static final int PORT = 8961;
-    private static final String URL = "jdbc:mysql://%s:" + PORT;
-    private static final String USERNAME = "root";
-    private static final String PASSWORD = "";
+@Disabled("we need resolve the issue of network between containers")
+public class DorisCDCSinkIT extends AbstractDorisIT {
+
     private static final String DATABASE = "test";
     private static final String SINK_TABLE = "e2e_table_sink";
-    private static final String DRIVER_JAR =
-            
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
-    private static final String SET_SQL =
-            "ADMIN SET FRONTEND CONFIG (\"enable_batch_delete_by_default\" = 
\"true\")";
     private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT 
EXISTS " + DATABASE;
     private static final String DDL_SINK =
             "CREATE TABLE IF NOT EXISTS "
@@ -88,43 +60,13 @@ public class DorisCDCSinkIT extends TestSuiteBase 
implements TestResource {
                     + "\"replication_allocation\" = \"tag.location.default: 
1\""
                     + ")";
 
-    private Connection jdbcConnection;
-    private GenericContainer<?> dorisServer;
-
     @BeforeAll
-    @Override
-    public void startUp() {
-        dorisServer =
-                new GenericContainer<>(DOCKER_IMAGE)
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(HOST)
-                        .withPrivilegedMode(true)
-                        .withLogConsumer(
-                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
-        dorisServer.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
PORT, DOCKER_PORT)));
-        Startables.deepStart(Stream.of(dorisServer)).join();
-        log.info("doris container started");
-        // wait for doris fully start
-        given().ignoreExceptions()
-                .await()
-                .atMost(10000, TimeUnit.SECONDS)
-                .untilAsserted(this::initializeJdbcConnection);
+    public void init() {
         initializeJdbcTable();
     }
 
-    @AfterAll
-    @Override
-    public void tearDown() throws Exception {
-        if (jdbcConnection != null) {
-            jdbcConnection.close();
-        }
-        if (dorisServer != null) {
-            dorisServer.close();
-        }
-    }
-
     @TestTemplate
-    public void testDorisSink(TestContainer container) throws Exception {
+    public void testDorisCDCSink(TestContainer container) throws Exception {
         Container.ExecResult execResult =
                 container.executeJob("/write-cdc-changelog-to-doris.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
@@ -148,23 +90,6 @@ public class DorisCDCSinkIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertIterableEquals(expected, actual);
     }
 
-    private void initializeJdbcConnection()
-            throws SQLException, ClassNotFoundException, 
InstantiationException,
-                    IllegalAccessException, MalformedURLException {
-        URLClassLoader urlClassLoader =
-                new URLClassLoader(
-                        new URL[] {new URL(DRIVER_JAR)}, 
DorisCDCSinkIT.class.getClassLoader());
-        Thread.currentThread().setContextClassLoader(urlClassLoader);
-        Driver driver = (Driver) 
urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
-        Properties props = new Properties();
-        props.put("user", USERNAME);
-        props.put("password", PASSWORD);
-        jdbcConnection = driver.connect(String.format(URL, 
dorisServer.getHost()), props);
-        try (Statement statement = jdbcConnection.createStatement()) {
-            statement.execute(SET_SQL);
-        }
-    }
-
     private void initializeJdbcTable() {
         try (Statement statement = jdbcConnection.createStatement()) {
             // create databases
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 035ea47544..4722b84025 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -29,92 +29,30 @@ import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
 import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory;
 import org.apache.seatunnel.connectors.doris.config.DorisOptions;
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerLoggerFactory;
 
-import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
-import java.util.stream.Stream;
-
-import static org.awaitility.Awaitility.given;
 
 @Slf4j
-public class DorisCatalogIT extends TestSuiteBase implements TestResource {
-
-    // use image adamlee489/doris:1.2.7.1_arm when running this test on mac
-    private static final String DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86";
-    private static final String HOST = "doris_catalog_e2e";
-    private static final int DOCKER_QUERY_PORT = 9030;
-    private static final int DOCKER_HTTP_PORT = 8030;
-    private static final int QUERY_PORT = 19030;
-    private static final int HTTP_PORT = 18030;
-    private static final String URL = "jdbc:mysql://%s:" + QUERY_PORT;
-    private static final String USERNAME = "root";
-    private static final String PASSWORD = "";
+public class DorisCatalogIT extends AbstractDorisIT {
+
     private static final String DATABASE = "test";
     private static final String SINK_TABLE = "doris_catalog_e2e";
-    private static final String SET_SQL =
-            "ADMIN SET FRONTEND CONFIG (\"enable_batch_delete_by_default\" = 
\"true\")";
-    private static final String SHOW_BE = "SHOW BACKENDS";
-
-    private GenericContainer<?> container;
-    private Connection jdbcConnection;
     private DorisCatalogFactory factory;
     private DorisCatalog catalog;
 
     @BeforeAll
-    @Override
-    public void startUp() throws Exception {
-
-        container =
-                new GenericContainer<>(DOCKER_IMAGE)
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(HOST)
-                        .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
-                        .withEnv("FE_ID", "1")
-                        .withEnv("CURRENT_BE_IP", "127.0.0.1")
-                        .withEnv("CURRENT_BE_PORT", "9050")
-                        .withCommand("ulimit -n 65536")
-                        .withCreateContainerCmdModifier(
-                                cmd -> cmd.getHostConfig().withMemorySwap(0L))
-                        .withPrivilegedMode(true)
-                        .withLogConsumer(
-                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
-        container.setPortBindings(
-                Lists.newArrayList(
-                        String.format("%s:%s", QUERY_PORT, DOCKER_QUERY_PORT),
-                        String.format("%s:%s", HTTP_PORT, DOCKER_HTTP_PORT)));
-
-        Startables.deepStart(Stream.of(container)).join();
-        log.info("doris container started");
-        given().ignoreExceptions()
-                .await()
-                .atMost(10000, TimeUnit.SECONDS)
-                .untilAsserted(this::initializeJdbcConnection);
+    public void init() {
         initCatalogFactory();
         initCatalog();
     }
@@ -203,42 +141,9 @@ public class DorisCatalogIT extends TestSuiteBase 
implements TestResource {
     }
 
     @AfterAll
-    @Override
-    public void tearDown() throws Exception {
-        if (container != null) {
-            container.close();
-        }
-        if (jdbcConnection != null) {
-            jdbcConnection.close();
-        }
+    public void close() {
         if (catalog != null) {
             catalog.close();
         }
     }
-
-    private void initializeJdbcConnection() throws SQLException {
-        Properties props = new Properties();
-        props.put("user", USERNAME);
-        props.put("password", PASSWORD);
-
-        jdbcConnection =
-                DriverManager.getConnection(String.format(URL, 
container.getHost()), props);
-        try (Statement statement = jdbcConnection.createStatement()) {
-            statement.execute(SET_SQL);
-            ResultSet resultSet;
-            do {
-                resultSet = statement.executeQuery(SHOW_BE);
-            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
-        }
-    }
-
-    private boolean isBeReady(ResultSet rs, Duration duration) throws 
SQLException {
-        if (rs.next()) {
-            String isAlive = rs.getString(10).trim();
-            String totalCap = rs.getString(16).trim();
-            LockSupport.parkNanos(duration.toNanos());
-            return "true".equalsIgnoreCase(isAlive) && 
!"0.000".equalsIgnoreCase(totalCap);
-        }
-        return false;
-    }
 }


Reply via email to