This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3d46b7961 [Feature][Connector-V2][Doris]Add Doris Source & Sink
connector (#3586)
3d46b7961 is described below
commit 3d46b7961451e0ed6bcb6a4ba8589c90f7505f34
Author: TaoZex <[email protected]>
AuthorDate: Tue Nov 29 14:56:44 2022 +0800
[Feature][Connector-V2][Doris]Add Doris Source & Sink connector (#3586)
* [Feature][Connector-V2][Doris]Add Doris Source & Sink connector
---
docs/en/connector-v2/sink/Jdbc.md | 10 +-
docs/en/connector-v2/source/Jdbc.md | 2 +
.../internal/dialect/mysql/MySqlTypeMapper.java | 4 +
.../connectors/seatunnel/jdbc/JdbcDorisdbIT.java | 315 +++++++++++++++++++++
.../test/resources/jdbc_doris_source_and_sink.conf | 44 +++
5 files changed, 371 insertions(+), 4 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index 952521509..76705606b 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -132,6 +132,7 @@ there are some reference value for params above.
| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
|
| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
+| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
## Example
@@ -193,10 +194,10 @@ sink {
### 2.3.0-beta 2022-10-20
- [BugFix] Fix JDBC split exception
([2904](https://github.com/apache/incubator-seatunnel/pull/2904))
-- [Feature] Support Phoenix JDBC Source
([2499](https://github.com/apache/incubator-seatunnel/pull/2499))
-- [Feature] Support SQL Server JDBC Source
([2646](https://github.com/apache/incubator-seatunnel/pull/2646))
-- [Feature] Support Oracle JDBC Source
([2550](https://github.com/apache/incubator-seatunnel/pull/2550))
-- [Feature] Support StarRocks JDBC Source
([3060](https://github.com/apache/incubator-seatunnel/pull/3060))
+- [Feature] Support Phoenix JDBC Sink
([2499](https://github.com/apache/incubator-seatunnel/pull/2499))
+- [Feature] Support SQL Server JDBC Sink
([2646](https://github.com/apache/incubator-seatunnel/pull/2646))
+- [Feature] Support Oracle JDBC Sink
([2550](https://github.com/apache/incubator-seatunnel/pull/2550))
+- [Feature] Support StarRocks JDBC Sink
([3060](https://github.com/apache/incubator-seatunnel/pull/3060))
- [Feature] Support DB2 JDBC Sink
([2410](https://github.com/apache/incubator-seatunnel/pull/2410))
### next version
@@ -205,3 +206,4 @@ sink {
- [Feature] Support Teradata JDBC Sink
([3362](https://github.com/apache/incubator-seatunnel/pull/3362))
- [Feature] Support Sqlite JDBC Sink
([3089](https://github.com/apache/incubator-seatunnel/pull/3089))
- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3378](https://github.com/apache/incubator-seatunnel/issues/3378))
+- [Feature] Support Doris JDBC Sink
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 742ba48fd..9ad876167 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -109,6 +109,7 @@ there are some reference value for params above.
| starrocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver |
"jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" |
https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc
|
+| doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test |
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
## Example
@@ -162,3 +163,4 @@ parallel:
- [Feature] Support Tablestore Source
([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
- [Feature] Support Teradata JDBC Source
([3362](https://github.com/apache/incubator-seatunnel/pull/3362))
- [Feature] Support JDBC Fetch Size Config
([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
+- [Feature] Support Doris JDBC Source
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
index aa8206185..84a8e8bdc 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -112,6 +112,10 @@ public class MySqlTypeMapper implements
JdbcDialectTypeMapper {
case MYSQL_BIGINT_UNSIGNED:
return new DecimalType(20, 0);
case MYSQL_DECIMAL:
+ if (precision > 38) {
+ LOG.warn("{} will probably cause value overflow.",
MYSQL_DECIMAL);
+ return new DecimalType(38, 18);
+ }
return new DecimalType(precision, scale);
case MYSQL_DECIMAL_UNSIGNED:
return new DecimalType(precision + 1, scale);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
new file mode 100644
index 000000000..5b325a069
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@Disabled("Doris docker container is unstable")
+public class JdbcDorisdbIT extends TestSuiteBase implements TestResource {
+ private static final String DOCKER_IMAGE = "taozex/doris:v1.1.1";
+ private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ private static final String HOST = "doris_e2e";
+ private static final int DOCKER_PORT = 9030;
+ private static final int PORT = 8960;
+
+ private static final String URL = "jdbc:mysql://%s:" + PORT;
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "";
+ private static final String DATABASE = "test";
+ private static final String SOURCE_TABLE = "e2e_table_source";
+ private static final String SINK_TABLE = "e2e_table_sink";
+ private static final String DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+ private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL,
SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL,
INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL";
+
+ private static final String DDL_SOURCE = "create table " + DATABASE + "."
+ SOURCE_TABLE + " (\n" +
+ " BIGINT_COL BIGINT,\n" +
+ " LARGEINT_COL LARGEINT,\n" +
+ " SMALLINT_COL SMALLINT,\n" +
+ " TINYINT_COL TINYINT,\n" +
+ " BOOLEAN_COL BOOLEAN,\n" +
+ " DECIMAL_COL DECIMAL,\n" +
+ " DOUBLE_COL DOUBLE,\n" +
+ " FLOAT_COL FLOAT,\n" +
+ " INT_COL INT,\n" +
+ " CHAR_COL CHAR,\n" +
+ " VARCHAR_11_COL VARCHAR(11),\n" +
+ " STRING_COL STRING,\n" +
+ " DATETIME_COL DATETIME,\n" +
+ " DATE_COL DATE\n" +
+ ")ENGINE=OLAP\n" +
+ "DUPLICATE KEY(`BIGINT_COL`)\n" +
+ "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_allocation\" = \"tag.location.default: 1\"" +
+ ")";
+
+ private static final String DDL_SINK = "create table " + DATABASE + "." +
SINK_TABLE + " (\n" +
+ " BIGINT_COL BIGINT,\n" +
+ " LARGEINT_COL LARGEINT,\n" +
+ " SMALLINT_COL SMALLINT,\n" +
+ " TINYINT_COL TINYINT,\n" +
+ " BOOLEAN_COL BOOLEAN,\n" +
+ " DECIMAL_COL DECIMAL,\n" +
+ " DOUBLE_COL DOUBLE,\n" +
+ " FLOAT_COL FLOAT,\n" +
+ " INT_COL INT,\n" +
+ " CHAR_COL CHAR,\n" +
+ " VARCHAR_11_COL VARCHAR(11),\n" +
+ " STRING_COL STRING,\n" +
+ " DATETIME_COL DATETIME,\n" +
+ " DATE_COL DATE\n" +
+ ")ENGINE=OLAP\n" +
+ "DUPLICATE KEY(`BIGINT_COL`)\n" +
+ "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_allocation\" = \"tag.location.default: 1\"" +
+ ")";
+
+ private static final String INIT_DATA_SQL = "insert into " + DATABASE +
"." + SOURCE_TABLE + " (\n" +
+ " BIGINT_COL,\n" +
+ " LARGEINT_COL,\n" +
+ " SMALLINT_COL,\n" +
+ " TINYINT_COL,\n" +
+ " BOOLEAN_COL,\n" +
+ " DECIMAL_COL,\n" +
+ " DOUBLE_COL,\n" +
+ " FLOAT_COL,\n" +
+ " INT_COL,\n" +
+ " CHAR_COL,\n" +
+ " VARCHAR_11_COL,\n" +
+ " STRING_COL,\n" +
+ " DATETIME_COL,\n" +
+ " DATE_COL\n" +
+ ")values(\n" +
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
+ ")";
+
+ private Connection jdbcConnection;
+ private GenericContainer<?> dorisServer;
+ private static final List<SeaTunnelRow> TEST_DATASET =
generateTestDataSet();
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory = container -> {
+ Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ dorisServer = new GenericContainer<>(DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ dorisServer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", PORT, DOCKER_PORT)));
+ Startables.deepStart(Stream.of(dorisServer)).join();
+ log.info("Doris container started");
+ // wait to add BE
+ Thread.sleep(600000);
+ // wait for doris fully start
+ given().ignoreExceptions()
+ .await()
+ .atMost(600, TimeUnit.SECONDS)
+ .untilAsserted(this::initializeJdbcConnection);
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ private static List<SeaTunnelRow> generateTestDataSet() {
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Long.valueOf(1123456),
+ Short.parseShort("1"),
+ Byte.parseByte("1"),
+ Boolean.FALSE,
+ BigDecimal.valueOf(2222243, 1),
+ Double.parseDouble("2222243.2222243"),
+ Float.parseFloat("222224"),
+ Integer.parseInt("1"),
+ "a",
+ "VARCHAR_COL",
+ "STRING_COL",
+ "2022-03-02 13:24:45",
+ "2022-03-02"
+ });
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (jdbcConnection != null) {
+ jdbcConnection.close();
+ }
+ if (dorisServer != null) {
+ dorisServer.close();
+ }
+ }
+
+ @TestTemplate
+ public void testDorisSink(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/jdbc_doris_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ try {
+ assertHasData(SINK_TABLE);
+
+ String sourceSql = String.format("select * from %s.%s", DATABASE,
SOURCE_TABLE);
+ String sinkSql = String.format("select * from %s.%s", DATABASE,
SINK_TABLE);
+ List<String> columnList =
Arrays.stream(COLUMN_STRING.split(",")).map(x ->
x.trim()).collect(Collectors.toList());
+ Statement sourceStatement = jdbcConnection.createStatement();
+ Statement sinkStatement = jdbcConnection.createStatement();
+ ResultSet sourceResultSet =
sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+
Assertions.assertEquals(sourceResultSet.getMetaData().getColumnCount(),
sinkResultSet.getMetaData().getColumnCount());
+ while (sourceResultSet.next()) {
+ if (sinkResultSet.next()) {
+ for (String column : columnList) {
+ Object source = sourceResultSet.getObject(column);
+ Object sink = sinkResultSet.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
+ InputStream sourceAsciiStream =
sourceResultSet.getBinaryStream(column);
+ InputStream sinkAsciiStream =
sinkResultSet.getBinaryStream(column);
+ String sourceValue =
IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+ String sinkValue =
IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ }
+ }
+ }
+ //Check the row numbers is equal
+ sourceResultSet.last();
+ sinkResultSet.last();
+ Assertions.assertEquals(sourceResultSet.getRow(),
sinkResultSet.getRow());
+ clearSinkTable();
+ } catch (Exception e) {
+ throw new RuntimeException("Get doris connection error", e);
+ }
+ }
+
+ private void initializeJdbcConnection() throws SQLException,
ClassNotFoundException, MalformedURLException, InstantiationException,
IllegalAccessException {
+ URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new
URL(DRIVER_JAR)}, JdbcDorisdbIT.class.getClassLoader());
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ Driver driver = (Driver)
urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
+ Properties props = new Properties();
+ props.put("user", USERNAME);
+ props.put("password", PASSWORD);
+ jdbcConnection = driver.connect(String.format(URL,
dorisServer.getHost()), props);
+ }
+
+ private void initializeJdbcTable() {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ // create databases
+ statement.execute("create database test");
+ // create source table
+ statement.execute(DDL_SOURCE);
+ // create sink table
+ statement.execute(DDL_SINK);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing table failed!", e);
+ }
+ }
+
+ private void batchInsertData() {
+ List<SeaTunnelRow> rows = TEST_DATASET;
+ try {
+ jdbcConnection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
jdbcConnection.prepareStatement(INIT_DATA_SQL)) {
+ for (int i = 0; i < rows.size(); i++) {
+ for (int index = 0; index <
rows.get(i).getFields().length; index++) {
+ preparedStatement.setObject(index + 1,
rows.get(i).getFields()[index]);
+ }
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ }
+ jdbcConnection.commit();
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new RuntimeException("Get connection error", exception);
+ }
+ }
+
+ private void assertHasData(String table) {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
+ ResultSet source = statement.executeQuery(sql);
+ Assertions.assertTrue(source.next());
+ } catch (Exception e) {
+ throw new RuntimeException("Test doris server image error", e);
+ }
+ }
+
+ private void clearSinkTable() {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ statement.execute(String.format("TRUNCATE TABLE %s.%s", DATABASE,
SINK_TABLE));
+ } catch (SQLException e) {
+ throw new RuntimeException("Test doris server image error", e);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf
new file mode 100644
index 000000000..7a2da5fba
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://doris_e2e:9030"
+ user = root
+ password = ""
+ query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL,
BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL,
VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from
`test`.`e2e_table_source`"
+ }
+}
+
+transform {
+}
+
+sink {
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://doris_e2e:9030"
+ user = root
+ password = ""
+ query = "INSERT INTO `test`.`e2e_table_sink` (BIGINT_COL, LARGEINT_COL,
SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL,
INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ }
+}
\ No newline at end of file