This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bb2c912404 [Fix][Doris] Fix the abnormality of deleting data in CDC
scenario. (#7315)
bb2c912404 is described below
commit bb2c912404fac13e829044c60259f2ab27bff3a1
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Aug 6 21:46:12 2024 +0800
[Fix][Doris] Fix the abnormality of deleting data in CDC scenario. (#7315)
---
.../doris/serialize/SeaTunnelRowSerializer.java | 77 +++++-----
.../doris/sink/writer/DorisStreamLoad.java | 32 ++--
.../connector-doris-e2e/pom.xml | 14 ++
.../e2e/connector/doris/DorisCDCSinkIT.java | 171 ++++++++++++++++++---
.../src/test/resources/ddl/mysql_cdc.sql | 38 +++++
.../src/test/resources/docker/server-gtids/my.cnf | 65 ++++++++
.../src/test/resources/docker/setup.sql | 28 ++++
.../resources/write-cdc-changelog-to-doris.conf | 18 +--
8 files changed, 356 insertions(+), 87 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
index 0c5b9c0c42..0e67257a32 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.doris.serialize;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -29,6 +30,7 @@ import
org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -42,6 +44,7 @@ public class SeaTunnelRowSerializer implements
DorisSerializer {
private final SeaTunnelRowType seaTunnelRowType;
private final String fieldDelimiter;
private final boolean enableDelete;
+ private final SerializationSchema serialize;
public SeaTunnelRowSerializer(
String type,
@@ -49,32 +52,46 @@ public class SeaTunnelRowSerializer implements
DorisSerializer {
String fieldDelimiter,
boolean enableDelete) {
this.type = type;
- this.seaTunnelRowType = seaTunnelRowType;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
- }
+ List<Object> fieldNames = new
ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
+ List<SeaTunnelDataType<?>> fieldTypes =
+ new
ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes()));
+
+ if (enableDelete) {
+ fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
+ fieldTypes.add(STRING_TYPE);
+ }
- public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType
seaTunnelRowType)
- throws IOException {
+ this.seaTunnelRowType =
+ new SeaTunnelRowType(
+ fieldNames.toArray(new String[0]),
+ fieldTypes.toArray(new SeaTunnelDataType<?>[0]));
- JsonSerializationSchema jsonSerializationSchema =
- new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
- ObjectMapper mapper = jsonSerializationSchema.getMapper();
- mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
true);
- return jsonSerializationSchema.serialize(row);
+ if (JSON.equals(type)) {
+ JsonSerializationSchema jsonSerializationSchema =
+ new JsonSerializationSchema(this.seaTunnelRowType,
NULL_VALUE);
+ ObjectMapper mapper = jsonSerializationSchema.getMapper();
+ mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
true);
+ this.serialize = jsonSerializationSchema;
+ } else {
+ this.serialize =
+ TextSerializationSchema.builder()
+ .seaTunnelRowType(this.seaTunnelRowType)
+ .delimiter(fieldDelimiter)
+ .nullValue(NULL_VALUE)
+ .build();
+ }
}
- public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType
seaTunnelRowType)
- throws IOException {
+ public byte[] buildJsonString(SeaTunnelRow row) {
+
+ return serialize.serialize(row);
+ }
- TextSerializationSchema build =
- TextSerializationSchema.builder()
- .seaTunnelRowType(seaTunnelRowType)
- .delimiter(fieldDelimiter)
- .nullValue(NULL_VALUE)
- .build();
+ public byte[] buildCSVString(SeaTunnelRow row) {
- return build.serialize(row);
+ return serialize.serialize(row);
}
public String parseDeleteSign(RowKind rowKind) {
@@ -93,29 +110,17 @@ public class SeaTunnelRowSerializer implements
DorisSerializer {
@Override
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
- List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
- List<SeaTunnelDataType<?>> fieldTypes =
Arrays.asList(seaTunnelRowType.getFieldTypes());
-
if (enableDelete) {
- SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
- seaTunnelRowEnableDelete.setField(
- seaTunnelRow.getFields().length,
parseDeleteSign(seaTunnelRow.getRowKind()));
- fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
- fieldTypes.add(STRING_TYPE);
+
+ List<Object> newFields = new
ArrayList<>(Arrays.asList(seaTunnelRow.getFields()));
+ newFields.add(parseDeleteSign(seaTunnelRow.getRowKind()));
+ seaTunnelRow = new SeaTunnelRow(newFields.toArray());
}
if (JSON.equals(type)) {
- return buildJsonString(
- seaTunnelRow,
- new SeaTunnelRowType(
- fieldNames.toArray(new String[0]),
- fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
+ return buildJsonString(seaTunnelRow);
} else if (CSV.equals(type)) {
- return buildCSVString(
- seaTunnelRow,
- new SeaTunnelRowType(
- fieldNames.toArray(new String[0]),
- fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
+ return buildCSVString(seaTunnelRow);
} else {
throw new IllegalArgumentException("The type " + type + " is not
supported!");
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index eadcf94cd5..40b75aedc6 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -17,7 +17,10 @@
package org.apache.seatunnel.connectors.doris.sink.writer;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
@@ -31,9 +34,9 @@ import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -64,23 +67,23 @@ public class DorisStreamLoad implements Serializable {
private static final String ABORT_URL_PATTERN =
"http://%s/api/%s/_stream_load_2pc";
private static final String JOB_EXIST_FINISHED = "FINISHED";
private final String loadUrlStr;
- private final String hostPort;
+ @Getter private final String hostPort;
private final String abortUrlStr;
private final String user;
private final String passwd;
- private final String db;
+ @Getter private final String db;
private final String table;
private final boolean enable2PC;
private final boolean enableDelete;
private final Properties streamLoadProp;
private final RecordStream recordStream;
- private Future<CloseableHttpResponse> pendingLoadFuture;
+ @Getter private Future<CloseableHttpResponse> pendingLoadFuture;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private volatile boolean loadBatchFirstRecord;
private volatile boolean loading = false;
private String label;
- private long recordCount = 0;
+ @Getter private long recordCount = 0;
public DorisStreamLoad(
String hostPort,
@@ -115,18 +118,6 @@ public class DorisStreamLoad implements Serializable {
loadBatchFirstRecord = true;
}
- public String getDb() {
- return db;
- }
-
- public String getHostPort() {
- return hostPort;
- }
-
- public Future<CloseableHttpResponse> getPendingLoadFuture() {
- return pendingLoadFuture;
- }
-
public void abortPreCommit(String labelSuffix, long chkID) throws
Exception {
long startChkID = chkID;
log.info("abort for labelSuffix {}. start chkId {}.", labelSuffix,
chkID);
@@ -196,10 +187,6 @@ public class DorisStreamLoad implements Serializable {
recordCount++;
}
- public long getRecordCount() {
- return recordCount;
- }
-
public String getLoadFailedMsg() {
if (!loading) {
return null;
@@ -300,10 +287,9 @@ public class DorisStreamLoad implements Serializable {
"Fail to abort transaction " + txnID + " with url " +
abortUrlStr);
}
- ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res =
- mapper.readValue(loadResult, new TypeReference<HashMap<String,
String>>() {});
+ JsonUtils.parseObject(loadResult, new
TypeReference<HashMap<String, String>>() {});
if (!LoadStatus.SUCCESS.equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisConnectorException(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
index af85d92ace..7a3008adb3 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
@@ -49,5 +49,19 @@
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-mysql</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <!-- test dependencies on TestContainers -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
index 9afa91d4e8..33108b8b8e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
@@ -17,16 +17,27 @@
package org.apache.seatunnel.e2e.connector.doris;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
+import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -34,11 +45,18 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.awaitility.Awaitility.await;
+
@Slf4j
-@Disabled("we need resolve the issue of network between containers")
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
public class DorisCDCSinkIT extends AbstractDorisIT {
private static final String DATABASE = "test";
@@ -60,34 +78,121 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
+ "\"replication_allocation\" = \"tag.location.default:
1\""
+ ")";
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "mysqluser";
+ private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
+ private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table";
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Doris-CDC/lib
&& cd /tmp/seatunnel/plugins/Doris-CDC/lib && wget "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(
+ MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw",
MYSQL_DATABASE);
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+ }
+
+ private String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+ }
+
@BeforeAll
public void init() {
+ log.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ log.info("Mysql Containers are started");
+ inventoryDatabase.createAndInitialize();
+ log.info("Mysql ddl execution is complete");
initializeJdbcTable();
}
@TestTemplate
public void testDorisCDCSink(TestContainer container) throws Exception {
- Container.ExecResult execResult =
- container.executeJob("/write-cdc-changelog-to-doris.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
+
+ clearTable(DATABASE, SINK_TABLE);
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/write-cdc-changelog-to-doris.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
String sinkSql = String.format("select * from %s.%s", DATABASE,
SINK_TABLE);
- Set<List<Object>> actual = new HashSet<>();
- try (Statement sinkStatement = jdbcConnection.createStatement()) {
- ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
- while (sinkResultSet.next()) {
- List<Object> row =
- Arrays.asList(
- sinkResultSet.getLong("uuid"),
- sinkResultSet.getString("name"),
- sinkResultSet.getInt("score"));
- actual.add(row);
- }
- }
+
Set<List<Object>> expected =
- Stream.<List<Object>>of(Arrays.asList(1L, "A_1", 100),
Arrays.asList(3L, "C", 100))
+ Stream.<List<Object>>of(
+ Arrays.asList(1L, "Alice", 95),
Arrays.asList(2L, "Bob", 88))
.collect(Collectors.toSet());
- Assertions.assertIterableEquals(expected, actual);
+
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Set<List<Object>> actual = new HashSet<>();
+ try (Statement sinkStatement =
jdbcConnection.createStatement()) {
+ ResultSet sinkResultSet =
sinkStatement.executeQuery(sinkSql);
+ while (sinkResultSet.next()) {
+ List<Object> row =
+ Arrays.asList(
+
sinkResultSet.getLong("uuid"),
+
sinkResultSet.getString("name"),
+
sinkResultSet.getInt("score"));
+ actual.add(row);
+ }
+ }
+ Assertions.assertIterableEquals(expected, actual);
+ });
+
+ executeSql("DELETE FROM " + MYSQL_DATABASE + "." + SOURCE_TABLE + "
WHERE uuid = 1");
+
+ Set<List<Object>> expectedAfterDelete =
+ Stream.<List<Object>>of(Arrays.asList(2L, "Bob",
88)).collect(Collectors.toSet());
+
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Set<List<Object>> actual = new HashSet<>();
+ try (Statement sinkStatement =
jdbcConnection.createStatement()) {
+ ResultSet sinkResultSet =
sinkStatement.executeQuery(sinkSql);
+ while (sinkResultSet.next()) {
+ List<Object> row =
+ Arrays.asList(
+
sinkResultSet.getLong("uuid"),
+
sinkResultSet.getString("name"),
+
sinkResultSet.getInt("score"));
+ actual.add(row);
+ }
+ }
+
Assertions.assertIterableEquals(expectedAfterDelete, actual);
+ });
+ executeSql(
+ "INSERT INTO " + MYSQL_DATABASE + "." + SOURCE_TABLE + "
VALUES (1, 'Alice', 95)");
}
private void initializeJdbcTable() {
@@ -100,4 +205,32 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
throw new RuntimeException("Initializing table failed!", e);
}
}
+
+ private void executeDorisSql(String sql) {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ }
+
+ // Execute SQL
+ private void executeSql(String sql) {
+ try (Connection connection = getJdbcConnection()) {
+ connection.createStatement().execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void clearTable(String database, String tableName) {
+ executeDorisSql("truncate table " + database + "." + tableName);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql
new file mode 100644
index 0000000000..638da2981b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -0,0 +1,38 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+--
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `mysql_cdc`;
+
+use mysql_cdc;
+-- Create a mysql data source table
+CREATE TABLE IF NOT EXISTS `mysql_cdc`.`mysql_cdc_e2e_source_table` (
+ `uuid` BIGINT,
+ `name` VARCHAR(128),
+ `score` INT,
+ PRIMARY KEY (`uuid`)
+) ENGINE=InnoDB;
+
+
+
+truncate table `mysql_cdc`.`mysql_cdc_e2e_source_table`;
+
+INSERT INTO `mysql_cdc`.`mysql_cdc_e2e_source_table` (uuid, name, score) VALUES
+(1, 'Alice', 95),
+(2, 'Bob', 88);
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf
new file mode 100644
index 0000000000..a390897885
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but
would
+# be longer on a production system. Row-level info is required for ingest to
work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+expire_logs_days = 1
+binlog_format = row
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql
new file mode 100644
index 0000000000..429061558b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql
@@ -0,0 +1,28 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'mysqluser' - all privileges
+-- 2) 'st_user_source' - all privileges required by the snapshot reader AND
binlog reader (used for testing)
+--
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+CREATE USER 'st_user_source' IDENTIFIED BY 'mysqlpw';
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
DROP, LOCK TABLES ON *.* TO 'st_user_source'@'%';
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
index d4d4e69f9d..7e811c709b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
@@ -17,23 +17,24 @@
env {
parallelism = 1
- job.mode = "BATCH"
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
}
source {
MySQL-CDC {
parallelism = 1
- server-id = 5656
- username = "root"
- password = "Bigdata2023@"
- table-names = ["test.e2e_table_sink"]
- base-url = "jdbc:mysql://119.3.230.145:56725/test"
+ server-id = 5652
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}
}
sink {
Doris {
- fenodes = "10.16.10.14:8234"
+ fenodes = "doris_e2e:8030"
username = root
password = ""
database = "test"
@@ -43,8 +44,7 @@ sink {
sink.enable-delete = "true"
doris.config {
format = "csv"
- "column_separator" = "\\x01"
- "line_delimiter" = "\\x01"
+ "column_separator" = ","
}
}
}
\ No newline at end of file