This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2c5b48e907 [Improve] Add batch flush in doris sink (#6024)
2c5b48e907 is described below
commit 2c5b48e9078718cb31464e850f2a62da137c8b4d
Author: Jia Fan <[email protected]>
AuthorDate: Sat Dec 23 10:35:46 2023 +0800
[Improve] Add batch flush in doris sink (#6024)
---
docs/en/connector-v2/sink/Doris.md | 1 +
.../connectors/doris/config/DorisConfig.java | 2 +-
.../connectors/doris/config/DorisOptions.java | 25 ++--
.../connectors/doris/config/DorisSinkFactory.java | 48 --------
.../connectors/doris/rest/RestService.java | 3 +-
.../seatunnel/connectors/doris/sink/DorisSink.java | 81 +------------
.../doris/sink/writer/DorisSinkWriter.java | 63 +++++-----
.../doris/sink/writer/DorisStreamLoad.java | 9 +-
.../doris/sink/writer/LabelGenerator.java | 4 +-
.../connectors/doris/sink/writer/RecordBuffer.java | 2 +-
.../e2e/connector/doris/AbstractDorisIT.java | 132 +++++++++++++++++++++
.../e2e/connector/doris/DorisCDCSinkIT.java | 85 +------------
.../e2e/connector/doris/DorisCatalogIT.java | 103 +---------------
13 files changed, 198 insertions(+), 360 deletions(-)
diff --git a/docs/en/connector-v2/sink/Doris.md
b/docs/en/connector-v2/sink/Doris.md
index 6bf8dc5369..de0c47453a 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -45,6 +45,7 @@ Version Supported
| sink.max-retries | int | No | 3 | the max retry times
if writing records to database failed
|
| sink.buffer-size | int | No | 256 * 1024 | the buffer size to
cache data for stream load.
|
| sink.buffer-count | int | No | 3 | the buffer count to
cache data for stream load.
|
+| doris.batch.size | int | No | 1024 | the batch size of the
write to doris each http request, when the row reaches the size or checkpoint
is executed, the data of cached will write to server.
|
| doris.config | map | yes | - | This option is used
to support operations such as `insert`, `delete`, and `update` when
automatically generate sql,and supported formats.
|
## Data Type Mapping
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
index 5aa3ce13ff..98a43c7b8e 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
@@ -65,6 +65,7 @@ public class DorisConfig implements Serializable {
private String password;
private Integer queryPort;
private String tableIdentifier;
+ private int batchSize;
// source option
private String readField;
@@ -76,7 +77,6 @@ public class DorisConfig implements Serializable {
private Integer requestRetries;
private boolean deserializeArrowAsync;
private int deserializeQueueSize;
- private int batchSize;
private int execMemLimit;
private boolean useOldApi;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
index 270cc03b21..04a4f02851 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
@@ -90,6 +90,11 @@ public interface DorisOptions {
.stringType()
.noDefaultValue()
.withDescription("the doris password.");
+ Option<Integer> DORIS_BATCH_SIZE =
+ Options.key("doris.batch.size")
+ .intType()
+ .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+ .withDescription("the batch size of the doris
read/write.");
// source config options
Option<String> DORIS_READ_FIELD =
@@ -139,22 +144,6 @@ public interface DorisOptions {
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
- Option<Integer> DORIS_BATCH_SIZE =
- Options.key("doris.batch.size")
- .intType()
- .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
- .withDescription("");
- Option<Long> DORIS_EXEC_MEM_LIMIT =
- Options.key("doris.exec.mem.limit")
- .longType()
- .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
- .withDescription("");
- Option<Boolean> SOURCE_USE_OLD_API =
- Options.key("source.use-old-api")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether to read data using the new interface
defined according to the FLIP-27 specification,default false");
// sink config options
Option<Boolean> SINK_ENABLE_2PC =
@@ -224,7 +213,9 @@ public interface DorisOptions {
.withDescription("Create table statement template, used to
create Doris table");
OptionRule.Builder SINK_RULE =
- OptionRule.builder().required(FENODES, USERNAME, PASSWORD,
TABLE_IDENTIFIER);
+ OptionRule.builder()
+ .required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER)
+ .optional(DORIS_BATCH_SIZE);
OptionRule.Builder CATALOG_RULE =
OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME,
PASSWORD);
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
deleted file mode 100644
index e3b56c86ac..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.config;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(Factory.class)
-public class DorisSinkFactory implements TableSinkFactory {
-
- public static final String IDENTIFIER = "Doris";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public OptionRule optionRule() {
- return OptionRule.builder()
- .required(
- DorisOptions.FENODES,
- DorisOptions.USERNAME,
- DorisOptions.PASSWORD,
- DorisOptions.SINK_LABEL_PREFIX,
- DorisOptions.DORIS_SINK_CONFIG_PREFIX)
- .optional(DorisOptions.SINK_ENABLE_2PC,
DorisOptions.SINK_ENABLE_DELETE)
- .build();
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index 8af81603e7..e1c83a0700 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -372,8 +372,7 @@ public class RestService implements Serializable {
HttpGet httpGet = new HttpGet(beUrl);
String response = send(dorisConfig, httpGet, logger);
logger.info("Backend Info:{}", response);
- List<BackendV2.BackendRowV2> backends =
parseBackendV2(response, logger);
- return backends;
+ return parseBackendV2(response, logger);
} catch (DorisConnectorException e) {
logger.info(
"Doris FE node {} is unavailable: {}, Request the next
Doris FE node",
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 92dade722f..8a45e34cef 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -17,29 +17,18 @@
package org.apache.seatunnel.connectors.doris.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
-import org.apache.seatunnel.connectors.doris.config.DorisOptions;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import
org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter;
@@ -47,29 +36,21 @@ import
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
import
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkStateSerializer;
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter;
-import com.google.auto.service.AutoService;
-
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-@AutoService(SeaTunnelSink.class)
public class DorisSink
implements SeaTunnelSink<SeaTunnelRow, DorisSinkState,
DorisCommitInfo, DorisCommitInfo>,
SupportSaveMode {
- private DorisConfig dorisConfig;
- private SeaTunnelRowType seaTunnelRowType;
+ private final DorisConfig dorisConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
private String jobId;
- private CatalogTable catalogTable;
-
- public DorisSink() {}
-
public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) {
this.dorisConfig = DorisConfig.of(config);
- this.catalogTable = catalogTable;
this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
@@ -78,63 +59,22 @@ public class DorisSink
return "Doris";
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.dorisConfig = DorisConfig.of(pluginConfig);
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- DorisOptions.FENODES.key(),
- DorisOptions.USERNAME.key(),
- DorisOptions.TABLE_IDENTIFIER.key());
- if (!result.isSuccess()) {
- throw new DorisConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- if (dorisConfig.getTableIdentifier().isEmpty() && catalogTable !=
null) {
- String tableIdentifier =
- catalogTable.getTableId().getDatabaseName()
- + "."
- + catalogTable.getTableId().getTableName();
- dorisConfig.setTableIdentifier(tableIdentifier);
- }
- }
-
@Override
public void setJobContext(JobContext jobContext) {
this.jobId = jobContext.getJobId();
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
- DorisSinkWriter dorisSinkWriter =
- new DorisSinkWriter(
- context, Collections.emptyList(), seaTunnelRowType,
dorisConfig, jobId);
- dorisSinkWriter.initializeLoad(Collections.emptyList());
- return dorisSinkWriter;
+ return new DorisSinkWriter(
+ context, Collections.emptyList(), seaTunnelRowType,
dorisConfig, jobId);
}
@Override
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>
restoreWriter(
SinkWriter.Context context, List<DorisSinkState> states) throws
IOException {
- DorisSinkWriter dorisWriter =
- new DorisSinkWriter(context, states, seaTunnelRowType,
dorisConfig, jobId);
- dorisWriter.initializeLoad(states);
- return dorisWriter;
+ return new DorisSinkWriter(context, states, seaTunnelRowType,
dorisConfig, jobId);
}
@Override
@@ -152,17 +92,6 @@ public class DorisSink
return Optional.of(new DorisCommitInfoSerializer());
}
- @Override
- public Optional<SinkAggregatedCommitter<DorisCommitInfo, DorisCommitInfo>>
- createAggregatedCommitter() throws IOException {
- return Optional.empty();
- }
-
- @Override
- public Optional<Serializer<DorisCommitInfo>>
getAggregatedCommitInfoSerializer() {
- return Optional.empty();
- }
-
@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
return Optional.empty();
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 8f0a63d289..2f35357e5b 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.connectors.doris.util.HttpUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -64,7 +65,6 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
private final String labelPrefix;
private final LabelGenerator labelGenerator;
private final int intervalTime;
- private final DorisSinkState dorisSinkState;
private final DorisSerializer serializer;
private final transient ScheduledExecutorService scheduledExecutorService;
private transient Thread executorThread;
@@ -77,12 +77,12 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
List<DorisSinkState> state,
SeaTunnelRowType seaTunnelRowType,
DorisConfig dorisConfig,
- String jobId) {
+ String jobId)
+ throws IOException {
this.dorisConfig = dorisConfig;
- this.lastCheckpointId = state.size() != 0 ?
state.get(0).getCheckpointId() : 0;
+ this.lastCheckpointId = !state.isEmpty() ?
state.get(0).getCheckpointId() : 0;
log.info("restore checkpointId {}", lastCheckpointId);
log.info("labelPrefix " + dorisConfig.getLabelPrefix());
- this.dorisSinkState = new DorisSinkState(dorisConfig.getLabelPrefix(),
lastCheckpointId);
this.labelPrefix =
dorisConfig.getLabelPrefix() + "_" + jobId + "_" +
context.getIndexOfSubtask();
this.labelGenerator = new LabelGenerator(labelPrefix,
dorisConfig.getEnable2PC());
@@ -92,9 +92,10 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
this.serializer = createSerializer(dorisConfig, seaTunnelRowType);
this.intervalTime = dorisConfig.getCheckInterval();
this.loading = false;
+ this.initializeLoad();
}
- public void initializeLoad(List<DorisSinkState> state) throws IOException {
+ private void initializeLoad() throws IOException {
this.backends = RestService.getBackendsV2(dorisConfig, log);
String backend = getAvailableBackend();
try {
@@ -124,10 +125,26 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
return;
}
dorisStreamLoad.writeRecord(serialize);
+ if (!dorisConfig.getEnable2PC()
+ && dorisStreamLoad.getRecordCount() >=
dorisConfig.getBatchSize()) {
+ flush();
+ startLoad(labelGenerator.generateLabel(lastCheckpointId));
+ }
}
@Override
public Optional<DorisCommitInfo> prepareCommit() throws IOException {
+ RespContent respContent = flush();
+ if (!dorisConfig.getEnable2PC()) {
+ return Optional.empty();
+ }
+ long txnId = respContent.getTxnId();
+
+ return Optional.of(
+ new DorisCommitInfo(dorisStreamLoad.getHostPort(),
dorisStreamLoad.getDb(), txnId));
+ }
+
+ @NonNull private RespContent flush() throws IOException {
// disable exception checker before stop load.
loading = false;
checkState(dorisStreamLoad != null);
@@ -139,25 +156,23 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
respContent.getMessage(),
respContent.getErrorURL());
throw new
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, errMsg);
}
- if (!dorisConfig.getEnable2PC()) {
- return Optional.empty();
- }
- long txnId = respContent.getTxnId();
-
- return Optional.of(
- new DorisCommitInfo(dorisStreamLoad.getHostPort(),
dorisStreamLoad.getDb(), txnId));
+ return respContent;
}
@Override
public List<DorisSinkState> snapshotState(long checkpointId) throws
IOException {
checkState(dorisStreamLoad != null);
- this.dorisStreamLoad.setHostPort(getAvailableBackend());
-
this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
- this.loading = true;
+ startLoad(labelGenerator.generateLabel(checkpointId + 1));
this.lastCheckpointId = checkpointId;
return Collections.singletonList(new DorisSinkState(labelPrefix,
lastCheckpointId));
}
+ private void startLoad(String label) throws IOException {
+ this.dorisStreamLoad.setHostPort(getAvailableBackend());
+ this.dorisStreamLoad.startLoad(label);
+ this.loading = true;
+ }
+
@Override
public void abortPrepare() {
if (dorisConfig.getEnable2PC()) {
@@ -204,23 +219,11 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
}
}
- @VisibleForTesting
- public boolean isLoading() {
- return this.loading;
- }
-
- @VisibleForTesting
- public void setDorisStreamLoad(DorisStreamLoad streamLoad) {
- this.dorisStreamLoad = streamLoad;
- }
-
- @VisibleForTesting
- public void setBackends(List<BackendV2.BackendRowV2> backends) {
- this.backends = backends;
- }
-
@Override
public void close() throws IOException {
+ if (!dorisConfig.getEnable2PC()) {
+ flush();
+ }
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index 696d5c4095..892c1da954 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -32,7 +32,6 @@ import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -79,6 +78,7 @@ public class DorisStreamLoad implements Serializable {
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private boolean loadBatchFirstRecord;
+ private long recordCount = 0;
public DorisStreamLoad(
String hostPort,
@@ -194,11 +194,11 @@ public class DorisStreamLoad implements Serializable {
recordStream.write(lineDelimiter);
}
recordStream.write(record);
+ recordCount++;
}
- @VisibleForTesting
- public RecordStream getRecordStream() {
- return recordStream;
+ public long getRecordCount() {
+ return recordCount;
}
public RespContent handlePreCommitResponse(CloseableHttpResponse response)
throws Exception {
@@ -225,6 +225,7 @@ public class DorisStreamLoad implements Serializable {
public void startLoad(String label) throws IOException {
loadBatchFirstRecord = true;
+ recordCount = 0;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput();
log.info("stream load started for {}", label);
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
index 1f4fdbb45a..df13d9b22a 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.connectors.doris.sink.writer;
/** Generator label for stream load. */
public class LabelGenerator {
- private String labelPrefix;
- private boolean enable2PC;
+ private final String labelPrefix;
+ private final boolean enable2PC;
public LabelGenerator(String labelPrefix, boolean enable2PC) {
this.labelPrefix = labelPrefix;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
index 7c861658f8..26125e96ee 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
@@ -55,7 +55,7 @@ public class RecordBuffer {
"start buffer data, read queue size {}, write queue size {}",
readQueue.size(),
writeQueue.size());
- checkState(readQueue.size() == 0);
+ checkState(readQueue.isEmpty());
checkState(writeQueue.size() == queueSize);
for (ByteBuffer byteBuffer : writeQueue) {
checkState(byteBuffer.position() == 0);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
new file mode 100644
index 0000000000..229cb04c79
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.doris;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+public abstract class AbstractDorisIT extends TestSuiteBase implements
TestResource {
+
+ protected GenericContainer<?> container;
+
+ // use image adamlee489/doris:1.2.7.1_arm when running this test on mac
+ private static final String DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86";
+ protected static final String HOST = "doris_e2e";
+ protected static final int QUERY_PORT = 9030;
+ protected static final int HTTP_PORT = 8030;
+ protected static final int BE_HTTP_PORT = 8040;
+ protected static final String URL = "jdbc:mysql://%s:" + QUERY_PORT;
+ protected static final String USERNAME = "root";
+ protected static final String PASSWORD = "";
+ protected Connection jdbcConnection;
+ private static final String SET_SQL =
+ "ADMIN SET FRONTEND CONFIG (\"enable_batch_delete_by_default\" =
\"true\")";
+ private static final String SHOW_BE = "SHOW BACKENDS";
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+ container =
+ new GenericContainer<>(DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
+ .withEnv("FE_ID", "1")
+ .withEnv("CURRENT_BE_IP", "127.0.0.1")
+ .withEnv("CURRENT_BE_PORT", "9050")
+ .withCommand("ulimit -n 65536")
+ .withCreateContainerCmdModifier(
+ cmd -> cmd.getHostConfig().withMemorySwap(0L))
+ .withPrivilegedMode(true)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
+ container.setPortBindings(
+ Lists.newArrayList(
+ String.format("%s:%s", QUERY_PORT, QUERY_PORT),
+ String.format("%s:%s", HTTP_PORT, HTTP_PORT),
+ String.format("%s:%s", BE_HTTP_PORT, BE_HTTP_PORT)));
+
+ Startables.deepStart(Stream.of(container)).join();
+ log.info("doris container started");
+ given().ignoreExceptions()
+ .await()
+ .atMost(10000, TimeUnit.SECONDS)
+ .untilAsserted(this::initializeJdbcConnection);
+ }
+
+ private void initializeJdbcConnection() throws SQLException {
+ Properties props = new Properties();
+ props.put("user", USERNAME);
+ props.put("password", PASSWORD);
+
+ jdbcConnection =
+ DriverManager.getConnection(String.format(URL,
container.getHost()), props);
+ try (Statement statement = jdbcConnection.createStatement()) {
+ statement.execute(SET_SQL);
+ ResultSet resultSet;
+ do {
+ resultSet = statement.executeQuery(SHOW_BE);
+ } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+ }
+ }
+
+ private boolean isBeReady(ResultSet rs, Duration duration) throws
SQLException {
+ if (rs.next()) {
+ String isAlive = rs.getString(10).trim();
+ String totalCap = rs.getString(16).trim();
+ LockSupport.parkNanos(duration.toNanos());
+ return "true".equalsIgnoreCase(isAlive) &&
!"0.000".equalsIgnoreCase(totalCap);
+ }
+ return false;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (container != null) {
+ container.close();
+ }
+ if (jdbcConnection != null) {
+ jdbcConnection.close();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
index 8983269a19..9afa91d4e8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
@@ -17,60 +17,32 @@
package org.apache.seatunnel.e2e.connector.doris;
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerLoggerFactory;
-import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.sql.Connection;
-import java.sql.Driver;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
-import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.awaitility.Awaitility.given;
-
@Slf4j
-@Disabled
-public class DorisCDCSinkIT extends TestSuiteBase implements TestResource {
- private static final String DOCKER_IMAGE =
"zykkk/doris:1.2.2.1-avx2-x86_84";
- private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
- private static final String HOST = "doris_cdc_e2e";
- private static final int DOCKER_PORT = 9030;
- private static final int PORT = 8961;
- private static final String URL = "jdbc:mysql://%s:" + PORT;
- private static final String USERNAME = "root";
- private static final String PASSWORD = "";
+@Disabled("we need resolve the issue of network between containers")
+public class DorisCDCSinkIT extends AbstractDorisIT {
+
private static final String DATABASE = "test";
private static final String SINK_TABLE = "e2e_table_sink";
- private static final String DRIVER_JAR =
-
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
- private static final String SET_SQL =
- "ADMIN SET FRONTEND CONFIG (\"enable_batch_delete_by_default\" =
\"true\")";
private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT
EXISTS " + DATABASE;
private static final String DDL_SINK =
"CREATE TABLE IF NOT EXISTS "
@@ -88,43 +60,13 @@ public class DorisCDCSinkIT extends TestSuiteBase
implements TestResource {
+ "\"replication_allocation\" = \"tag.location.default:
1\""
+ ")";
- private Connection jdbcConnection;
- private GenericContainer<?> dorisServer;
-
@BeforeAll
- @Override
- public void startUp() {
- dorisServer =
- new GenericContainer<>(DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(HOST)
- .withPrivilegedMode(true)
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
- dorisServer.setPortBindings(Lists.newArrayList(String.format("%s:%s",
PORT, DOCKER_PORT)));
- Startables.deepStart(Stream.of(dorisServer)).join();
- log.info("doris container started");
- // wait for doris fully start
- given().ignoreExceptions()
- .await()
- .atMost(10000, TimeUnit.SECONDS)
- .untilAsserted(this::initializeJdbcConnection);
+ public void init() {
initializeJdbcTable();
}
- @AfterAll
- @Override
- public void tearDown() throws Exception {
- if (jdbcConnection != null) {
- jdbcConnection.close();
- }
- if (dorisServer != null) {
- dorisServer.close();
- }
- }
-
@TestTemplate
- public void testDorisSink(TestContainer container) throws Exception {
+ public void testDorisCDCSink(TestContainer container) throws Exception {
Container.ExecResult execResult =
container.executeJob("/write-cdc-changelog-to-doris.conf");
Assertions.assertEquals(0, execResult.getExitCode());
@@ -148,23 +90,6 @@ public class DorisCDCSinkIT extends TestSuiteBase
implements TestResource {
Assertions.assertIterableEquals(expected, actual);
}
- private void initializeJdbcConnection()
- throws SQLException, ClassNotFoundException,
InstantiationException,
- IllegalAccessException, MalformedURLException {
- URLClassLoader urlClassLoader =
- new URLClassLoader(
- new URL[] {new URL(DRIVER_JAR)},
DorisCDCSinkIT.class.getClassLoader());
- Thread.currentThread().setContextClassLoader(urlClassLoader);
- Driver driver = (Driver)
urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
- Properties props = new Properties();
- props.put("user", USERNAME);
- props.put("password", PASSWORD);
- jdbcConnection = driver.connect(String.format(URL,
dorisServer.getHost()), props);
- try (Statement statement = jdbcConnection.createStatement()) {
- statement.execute(SET_SQL);
- }
- }
-
private void initializeJdbcTable() {
try (Statement statement = jdbcConnection.createStatement()) {
// create databases
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 035ea47544..4722b84025 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -29,92 +29,30 @@ import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerLoggerFactory;
-import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
-import java.util.stream.Stream;
-
-import static org.awaitility.Awaitility.given;
@Slf4j
-public class DorisCatalogIT extends TestSuiteBase implements TestResource {
-
- // use image adamlee489/doris:1.2.7.1_arm when running this test on mac
- private static final String DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86";
- private static final String HOST = "doris_catalog_e2e";
- private static final int DOCKER_QUERY_PORT = 9030;
- private static final int DOCKER_HTTP_PORT = 8030;
- private static final int QUERY_PORT = 19030;
- private static final int HTTP_PORT = 18030;
- private static final String URL = "jdbc:mysql://%s:" + QUERY_PORT;
- private static final String USERNAME = "root";
- private static final String PASSWORD = "";
+public class DorisCatalogIT extends AbstractDorisIT {
+
private static final String DATABASE = "test";
private static final String SINK_TABLE = "doris_catalog_e2e";
- private static final String SET_SQL =
- "ADMIN SET FRONTEND CONFIG (\"enable_batch_delete_by_default\" =
\"true\")";
- private static final String SHOW_BE = "SHOW BACKENDS";
-
- private GenericContainer<?> container;
- private Connection jdbcConnection;
private DorisCatalogFactory factory;
private DorisCatalog catalog;
@BeforeAll
- @Override
- public void startUp() throws Exception {
-
- container =
- new GenericContainer<>(DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(HOST)
- .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
- .withEnv("FE_ID", "1")
- .withEnv("CURRENT_BE_IP", "127.0.0.1")
- .withEnv("CURRENT_BE_PORT", "9050")
- .withCommand("ulimit -n 65536")
- .withCreateContainerCmdModifier(
- cmd -> cmd.getHostConfig().withMemorySwap(0L))
- .withPrivilegedMode(true)
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
- container.setPortBindings(
- Lists.newArrayList(
- String.format("%s:%s", QUERY_PORT, DOCKER_QUERY_PORT),
- String.format("%s:%s", HTTP_PORT, DOCKER_HTTP_PORT)));
-
- Startables.deepStart(Stream.of(container)).join();
- log.info("doris container started");
- given().ignoreExceptions()
- .await()
- .atMost(10000, TimeUnit.SECONDS)
- .untilAsserted(this::initializeJdbcConnection);
+ public void init() {
initCatalogFactory();
initCatalog();
}
@@ -203,42 +141,9 @@ public class DorisCatalogIT extends TestSuiteBase
implements TestResource {
}
@AfterAll
- @Override
- public void tearDown() throws Exception {
- if (container != null) {
- container.close();
- }
- if (jdbcConnection != null) {
- jdbcConnection.close();
- }
+ public void close() {
if (catalog != null) {
catalog.close();
}
}
-
- private void initializeJdbcConnection() throws SQLException {
- Properties props = new Properties();
- props.put("user", USERNAME);
- props.put("password", PASSWORD);
-
- jdbcConnection =
- DriverManager.getConnection(String.format(URL,
container.getHost()), props);
- try (Statement statement = jdbcConnection.createStatement()) {
- statement.execute(SET_SQL);
- ResultSet resultSet;
- do {
- resultSet = statement.executeQuery(SHOW_BE);
- } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
- }
- }
-
- private boolean isBeReady(ResultSet rs, Duration duration) throws
SQLException {
- if (rs.next()) {
- String isAlive = rs.getString(10).trim();
- String totalCap = rs.getString(16).trim();
- LockSupport.parkNanos(duration.toNanos());
- return "true".equalsIgnoreCase(isAlive) &&
!"0.000".equalsIgnoreCase(totalCap);
- }
- return false;
- }
}