This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3d46b7961 [Feature][Connector-V2][Doris]Add Doris Source & Sink 
connector (#3586)
3d46b7961 is described below

commit 3d46b7961451e0ed6bcb6a4ba8589c90f7505f34
Author: TaoZex <[email protected]>
AuthorDate: Tue Nov 29 14:56:44 2022 +0800

    [Feature][Connector-V2][Doris]Add Doris Source & Sink connector (#3586)
    
    * [Feature][Connector-V2][Doris]Add Doris Source & Sink connector
---
 docs/en/connector-v2/sink/Jdbc.md                  |  10 +-
 docs/en/connector-v2/source/Jdbc.md                |   2 +
 .../internal/dialect/mysql/MySqlTypeMapper.java    |   4 +
 .../connectors/seatunnel/jdbc/JdbcDorisdbIT.java   | 315 +++++++++++++++++++++
 .../test/resources/jdbc_doris_source_and_sink.conf |  44 +++
 5 files changed, 371 insertions(+), 4 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index 952521509..76705606b 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -132,6 +132,7 @@ there are some reference value for params above.
 | GBase8a    | com.gbase.jdbc.Driver                        | 
jdbc:gbase://e2e_gbase8aDb:5258/test                               | /          
                                        | 
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
 |
 | StarRocks  | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
 | db2        | com.ibm.db2.jcc.DB2Driver                    | 
jdbc:db2://localhost:50000/testdb                                  | 
com.ibm.db2.jcc.DB2XADataSource                    | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                            |
+| Doris      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
 | teradata   | com.teradata.jdbc.TeraDriver                 | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test              | /          
                                        | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                            |
 
 ## Example
@@ -193,10 +194,10 @@ sink {
 ### 2.3.0-beta 2022-10-20
 
 - [BugFix] Fix JDBC split exception 
([2904](https://github.com/apache/incubator-seatunnel/pull/2904))
-- [Feature] Support Phoenix JDBC Source 
([2499](https://github.com/apache/incubator-seatunnel/pull/2499))
-- [Feature] Support SQL Server JDBC Source 
([2646](https://github.com/apache/incubator-seatunnel/pull/2646))
-- [Feature] Support Oracle JDBC Source 
([2550](https://github.com/apache/incubator-seatunnel/pull/2550))
-- [Feature] Support StarRocks JDBC Source 
([3060](https://github.com/apache/incubator-seatunnel/pull/3060))
+- [Feature] Support Phoenix JDBC Sink 
([2499](https://github.com/apache/incubator-seatunnel/pull/2499))
+- [Feature] Support SQL Server JDBC Sink 
([2646](https://github.com/apache/incubator-seatunnel/pull/2646))
+- [Feature] Support Oracle JDBC Sink 
([2550](https://github.com/apache/incubator-seatunnel/pull/2550))
+- [Feature] Support StarRocks JDBC Sink 
([3060](https://github.com/apache/incubator-seatunnel/pull/3060))
 - [Feature] Support DB2 JDBC Sink 
([2410](https://github.com/apache/incubator-seatunnel/pull/2410))
 
 ### next version
@@ -205,3 +206,4 @@ sink {
 - [Feature] Support Teradata JDBC Sink 
([3362](https://github.com/apache/incubator-seatunnel/pull/3362))
 - [Feature] Support Sqlite JDBC Sink 
([3089](https://github.com/apache/incubator-seatunnel/pull/3089))
 - [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3378](https://github.com/apache/incubator-seatunnel/issues/3378))
+- [Feature] Support Doris JDBC Sink
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 742ba48fd..9ad876167 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -109,6 +109,7 @@ there are some reference value for params above.
 | starrocks  | com.mysql.cj.jdbc.Driver                            | 
jdbc:mysql://localhost:3306/test                                       | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
 | db2        | com.ibm.db2.jcc.DB2Driver                           | 
jdbc:db2://localhost:50000/testdb                                      | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                            |
 | tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | 
"jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | 
https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc      
                            |
+| doris      | com.mysql.cj.jdbc.Driver                            | 
jdbc:mysql://localhost:3306/test                                       | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
 | teradata   | com.teradata.jdbc.TeraDriver                        | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test                  | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                            |
 
 ## Example
@@ -162,3 +163,4 @@ parallel:
 - [Feature] Support Tablestore Source 
([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
 - [Feature] Support Teradata JDBC Source 
([3362](https://github.com/apache/incubator-seatunnel/pull/3362))
 - [Feature] Support JDBC Fetch Size Config 
([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
+- [Feature] Support Doris JDBC Source
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
index aa8206185..84a8e8bdc 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -112,6 +112,10 @@ public class MySqlTypeMapper implements 
JdbcDialectTypeMapper {
             case MYSQL_BIGINT_UNSIGNED:
                 return new DecimalType(20, 0);
             case MYSQL_DECIMAL:
+                if (precision > 38) {
+                    LOG.warn("{} will probably cause value overflow.", 
MYSQL_DECIMAL);
+                    return new DecimalType(38, 18);
+                }
                 return new DecimalType(precision, scale);
             case MYSQL_DECIMAL_UNSIGNED:
                 return new DecimalType(precision + 1, scale);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
new file mode 100644
index 000000000..5b325a069
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@Disabled("Doris docker container is unstable")
+public class JdbcDorisdbIT extends TestSuiteBase implements TestResource {
+    private static final String DOCKER_IMAGE = "taozex/doris:v1.1.1";
+    private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+    private static final String HOST = "doris_e2e";
+    private static final int DOCKER_PORT = 9030;
+    private static final int PORT = 8960;
+
+    private static final String URL = "jdbc:mysql://%s:" + PORT;
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private static final String DATABASE = "test";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private static final String DRIVER_JAR = 
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
+    private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL, 
SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, 
INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL";
+
+    private static final String DDL_SOURCE = "create table " + DATABASE + "." 
+ SOURCE_TABLE + " (\n" +
+            "  BIGINT_COL     BIGINT,\n" +
+            "  LARGEINT_COL   LARGEINT,\n" +
+            "  SMALLINT_COL   SMALLINT,\n" +
+            "  TINYINT_COL    TINYINT,\n" +
+            "  BOOLEAN_COL    BOOLEAN,\n" +
+            "  DECIMAL_COL    DECIMAL,\n" +
+            "  DOUBLE_COL     DOUBLE,\n" +
+            "  FLOAT_COL      FLOAT,\n" +
+            "  INT_COL        INT,\n" +
+            "  CHAR_COL       CHAR,\n" +
+            "  VARCHAR_11_COL VARCHAR(11),\n" +
+            "  STRING_COL     STRING,\n" +
+            "  DATETIME_COL   DATETIME,\n" +
+            "  DATE_COL       DATE\n" +
+            ")ENGINE=OLAP\n" +
+            "DUPLICATE KEY(`BIGINT_COL`)\n" +
+            "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+            "PROPERTIES (\n" +
+            "\"replication_allocation\" = \"tag.location.default: 1\"" +
+            ")";
+
+    private static final String DDL_SINK = "create table " + DATABASE + "." + 
SINK_TABLE + " (\n" +
+            "  BIGINT_COL     BIGINT,\n" +
+            "  LARGEINT_COL   LARGEINT,\n" +
+            "  SMALLINT_COL   SMALLINT,\n" +
+            "  TINYINT_COL    TINYINT,\n" +
+            "  BOOLEAN_COL    BOOLEAN,\n" +
+            "  DECIMAL_COL    DECIMAL,\n" +
+            "  DOUBLE_COL     DOUBLE,\n" +
+            "  FLOAT_COL      FLOAT,\n" +
+            "  INT_COL        INT,\n" +
+            "  CHAR_COL       CHAR,\n" +
+            "  VARCHAR_11_COL VARCHAR(11),\n" +
+            "  STRING_COL     STRING,\n" +
+            "  DATETIME_COL   DATETIME,\n" +
+            "  DATE_COL       DATE\n" +
+            ")ENGINE=OLAP\n" +
+            "DUPLICATE KEY(`BIGINT_COL`)\n" +
+            "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+            "PROPERTIES (\n" +
+            "\"replication_allocation\" = \"tag.location.default: 1\"" +
+            ")";
+
+    private static final String INIT_DATA_SQL = "insert into " + DATABASE + 
"." + SOURCE_TABLE + " (\n" +
+            "  BIGINT_COL,\n" +
+            "  LARGEINT_COL,\n" +
+            "  SMALLINT_COL,\n" +
+            "  TINYINT_COL,\n" +
+            "  BOOLEAN_COL,\n" +
+            "  DECIMAL_COL,\n" +
+            "  DOUBLE_COL,\n" +
+            "  FLOAT_COL,\n" +
+            "  INT_COL,\n" +
+            "  CHAR_COL,\n" +
+            "  VARCHAR_11_COL,\n" +
+            "  STRING_COL,\n" +
+            "  DATETIME_COL,\n" +
+            "  DATE_COL\n" +
+            ")values(\n" +
+            "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
+            ")";
+
+    private Connection jdbcConnection;
+    private GenericContainer<?> dorisServer;
+    private static final List<SeaTunnelRow> TEST_DATASET = 
generateTestDataSet();
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory = container -> {
+        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + DRIVER_JAR);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    };
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        dorisServer = new GenericContainer<>(DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        dorisServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", PORT, DOCKER_PORT)));
+        Startables.deepStart(Stream.of(dorisServer)).join();
+        log.info("Doris container started");
+        // wait to add BE
+        Thread.sleep(600000);
+        // wait for doris fully start
+        given().ignoreExceptions()
+                .await()
+                .atMost(600, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private static List<SeaTunnelRow> generateTestDataSet() {
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                    new Object[]{
+                        Long.valueOf(i),
+                        Long.valueOf(1123456),
+                        Short.parseShort("1"),
+                        Byte.parseByte("1"),
+                        Boolean.FALSE,
+                        BigDecimal.valueOf(2222243, 1),
+                        Double.parseDouble("2222243.2222243"),
+                        Float.parseFloat("222224"),
+                        Integer.parseInt("1"),
+                        "a",
+                        "VARCHAR_COL",
+                        "STRING_COL",
+                        "2022-03-02 13:24:45",
+                        "2022-03-02"
+                    });
+            rows.add(row);
+        }
+        return rows;
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+        if (dorisServer != null) {
+            dorisServer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testDorisSink(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/jdbc_doris_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        try  {
+            assertHasData(SINK_TABLE);
+
+            String sourceSql = String.format("select * from %s.%s", DATABASE, 
SOURCE_TABLE);
+            String sinkSql = String.format("select * from %s.%s", DATABASE, 
SINK_TABLE);
+            List<String> columnList = 
Arrays.stream(COLUMN_STRING.split(",")).map(x -> 
x.trim()).collect(Collectors.toList());
+            Statement sourceStatement = jdbcConnection.createStatement();
+            Statement sinkStatement = jdbcConnection.createStatement();
+            ResultSet sourceResultSet = 
sourceStatement.executeQuery(sourceSql);
+            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+            
Assertions.assertEquals(sourceResultSet.getMetaData().getColumnCount(), 
sinkResultSet.getMetaData().getColumnCount());
+            while (sourceResultSet.next()) {
+                if (sinkResultSet.next()) {
+                    for (String column : columnList) {
+                        Object source = sourceResultSet.getObject(column);
+                        Object sink = sinkResultSet.getObject(column);
+                        if (!Objects.deepEquals(source, sink)) {
+                            InputStream sourceAsciiStream = 
sourceResultSet.getBinaryStream(column);
+                            InputStream sinkAsciiStream = 
sinkResultSet.getBinaryStream(column);
+                            String sourceValue = 
IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+                            String sinkValue = 
IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+                            Assertions.assertEquals(sourceValue, sinkValue);
+                        }
+                    }
+                }
+            }
+            //Check the row numbers is equal
+            sourceResultSet.last();
+            sinkResultSet.last();
+            Assertions.assertEquals(sourceResultSet.getRow(), 
sinkResultSet.getRow());
+            clearSinkTable();
+        } catch (Exception e) {
+            throw new RuntimeException("Get doris connection error", e);
+        }
+    }
+
+    private void initializeJdbcConnection() throws SQLException, 
ClassNotFoundException, MalformedURLException, InstantiationException, 
IllegalAccessException {
+        URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new 
URL(DRIVER_JAR)}, JdbcDorisdbIT.class.getClassLoader());
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+        Driver driver = (Driver) 
urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
+        Properties props = new Properties();
+        props.put("user", USERNAME);
+        props.put("password", PASSWORD);
+        jdbcConnection =  driver.connect(String.format(URL, 
dorisServer.getHost()), props);
+    }
+
+    private void initializeJdbcTable() {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // create databases
+            statement.execute("create database test");
+            // create source table
+            statement.execute(DDL_SOURCE);
+            // create sink table
+            statement.execute(DDL_SINK);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void batchInsertData() {
+        List<SeaTunnelRow> rows = TEST_DATASET;
+        try  {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = 
jdbcConnection.prepareStatement(INIT_DATA_SQL)) {
+                for (int i = 0; i < rows.size(); i++) {
+                    for (int index = 0; index < 
rows.get(i).getFields().length; index++) {
+                        preparedStatement.setObject(index + 1, 
rows.get(i).getFields()[index]);
+                    }
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new RuntimeException("Get connection error", exception);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            String sql = String.format("select * from %s.%s limit 1", 
DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (Exception e) {
+            throw new RuntimeException("Test doris server image error", e);
+        }
+    }
+
+    private void clearSinkTable() {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute(String.format("TRUNCATE TABLE %s.%s", DATABASE, 
SINK_TABLE));
+        } catch (SQLException e) {
+            throw new RuntimeException("Test doris server image error", e);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf
new file mode 100644
index 000000000..7a2da5fba
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    driver = com.mysql.cj.jdbc.Driver
+    url = "jdbc:mysql://doris_e2e:9030"
+    user = root
+    password = ""
+    query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, 
BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, 
VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from 
`test`.`e2e_table_source`"
+  }
+}
+
+transform {
+}
+
+sink {
+  Jdbc {
+    driver = com.mysql.cj.jdbc.Driver
+    url = "jdbc:mysql://doris_e2e:9030"
+    user = root
+    password = ""
+    query = "INSERT INTO `test`.`e2e_table_sink` (BIGINT_COL, LARGEINT_COL, 
SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, 
INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL) 
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+  }
+}
\ No newline at end of file

Reply via email to