This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a73bb3e71 [Feature][Connector-V2][JDBC] support sqlite Source & Sink
(#3089)
a73bb3e71 is described below
commit a73bb3e714025be9f287ef2d8816b63914d33554
Author: nuts.jian <[email protected]>
AuthorDate: Thu Nov 24 16:31:50 2022 +0800
[Feature][Connector-V2][JDBC] support sqlite Source & Sink (#3089)
---
docs/en/connector-v2/sink/Jdbc.md | 2 +
docs/en/connector-v2/source/Jdbc.md | 2 +
pom.xml | 6 -
seatunnel-connectors-v2/connector-jdbc/pom.xml | 14 +-
.../seatunnel/jdbc/config/JdbcConfig.java | 1 +
.../internal/dialect/sqlite/SqliteDialect.java | 62 ++++++
.../dialect/sqlite/SqliteDialectFactory.java | 40 ++++
.../dialect/sqlite/SqliteJdbcRowConverter.java | 29 +++
.../internal/dialect/sqlite/SqliteTypeMapper.java | 172 +++++++++++++++++
.../internal/options/JdbcConnectionOptions.java | 1 +
.../connector-jdbc-flink-e2e/pom.xml | 5 +
.../seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java | 180 +++++++++++++++++
.../test/resources/jdbc/init_sql/sqlite_init.conf | 212 +++++++++++++++++++++
.../jdbc/jdbc_sqlite_source_and_sink.conf | 51 +++++
.../jdbc/jdbc_sqlite_source_and_sink_datatype.conf | 147 ++++++++++++++
.../connector-jdbc-spark-e2e/pom.xml | 5 +
.../seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java | 179 +++++++++++++++++
.../test/resources/jdbc/init_sql/sqlite_init.conf | 212 +++++++++++++++++++++
.../jdbc/jdbc_sqlite_source_and_sink.conf | 57 ++++++
.../jdbc/jdbc_sqlite_source_and_sink_datatype.conf | 147 ++++++++++++++
20 files changed, 1516 insertions(+), 8 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index f145cc220..af586d5ec 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -128,6 +128,7 @@ there are some reference value for params above.
| Phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
+| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
|
| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
@@ -199,4 +200,5 @@ sink {
### next version
+- [Feature] Support Sqlite JDBC Sink
([3089](https://github.com/apache/incubator-seatunnel/pull/3089))
- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3378](https://github.com/apache/incubator-seatunnel/issues/3378))
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 20bb0bb1e..e6c1c08a9 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -104,6 +104,7 @@ there are some reference value for params above.
| phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF |
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
| oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
+| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db |
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
| gbase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test |
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
|
| starrocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
@@ -156,5 +157,6 @@ parallel:
### next version
- [BugFix] Fix jdbc split bug
([3220](https://github.com/apache/incubator-seatunnel/pull/3220))
+- [Feature] Support Sqlite JDBC Source
([3089](https://github.com/apache/incubator-seatunnel/pull/3089))
- [Feature] Support Tablestore Source
([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
- [Feature] Support JDBC Fetch Size Config
([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
diff --git a/pom.xml b/pom.xml
index 832e199ec..0517ad23e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -841,12 +841,6 @@
</dependency>
</dependencies>
</plugin>
-
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- </plugin>
-
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 2ef11b89a..b7f40e417 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -36,7 +36,9 @@
<sqlserver.version>9.2.1.jre8</sqlserver.version>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<oracle.version>12.2.0.1</oracle.version>
+ <sqlite.version>3.39.3.0</sqlite.version>
<db2.version>db2jcc4</db2.version>
+ <sqlite.version>3.39.3.0</sqlite.version>
<tablestore.version>5.13.9</tablestore.version>
</properties>
@@ -78,20 +80,24 @@
<version>${oracle.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.xerial</groupId>
+ <artifactId>sqlite-jdbc</artifactId>
+ <version>${sqlite.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
<version>${db2.version}</version>
<scope>provided</scope>
</dependency>
-
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-jdbc</artifactId>
<version>${tablestore.version}</version>
<scope>provided</scope>
</dependency>
-
</dependencies>
</dependencyManagement>
@@ -129,6 +135,10 @@
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.xerial</groupId>
+ <artifactId>sqlite-jdbc</artifactId>
+ </dependency>
<dependency>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
index 12eeac7aa..3db3bdc5e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
@@ -111,6 +111,7 @@ public class JdbcConfig implements Serializable {
jdbcOptions.transactionTimeoutSec =
config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC.key());
}
}
+
return jdbcOptions;
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
new file mode 100644
index 000000000..bf361dc8c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class SqliteDialect implements JdbcDialect {
+ @Override
+ public String dialectName() {
+ return "Sqlite";
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new SqliteJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new SqliteTypeMapper();
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ String updateClause = Arrays.stream(fieldNames)
+ .map(fieldName -> quoteIdentifier(fieldName) + "=VALUES(" +
quoteIdentifier(fieldName) + ")")
+ .collect(Collectors.joining(", "));
+
+ String conflictFields = Arrays.stream(uniqueKeyFields)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(","));
+
+ String upsertSQL = getInsertIntoStatement(tableName, fieldNames) + "
ON CONFLICT(" + conflictFields + ") DO UPDATE SET " + updateClause;
+ return Optional.of(upsertSQL);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
new file mode 100644
index 000000000..e9639ec0c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/**
+ * Factory for {@link SqliteDialect}.
+ */
+
+@AutoService(JdbcDialectFactory.class)
+public class SqliteDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:sqlite:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new SqliteDialect();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java
new file mode 100644
index 000000000..1e56c5d43
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+public class SqliteJdbcRowConverter extends AbstractJdbcRowConverter {
+
+ @Override
+ public String converterName() {
+ return "Sqlite";
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java
new file mode 100644
index 000000000..87681aa7e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class SqliteTypeMapper implements JdbcDialectTypeMapper {
+
+ // ============================data types=====================
+
+ private static final String SQLITE_UNKNOWN = "UNKNOWN";
+ private static final String SQLITE_BIT = "BIT";
+ private static final String SQLITE_BOOLEAN = "BOOLEAN";
+
+ // -------------------------integer----------------------------
+ private static final String SQLITE_TINYINT = "TINYINT";
+ private static final String SQLITE_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String SQLITE_SMALLINT = "SMALLINT";
+ private static final String SQLITE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ private static final String SQLITE_MEDIUMINT = "MEDIUMINT";
+ private static final String SQLITE_MEDIUMINT_UNSIGNED = "MEDIUMINT
UNSIGNED";
+ private static final String SQLITE_INT = "INT";
+ private static final String SQLITE_INT_UNSIGNED = "INT UNSIGNED";
+ private static final String SQLITE_INTEGER = "INTEGER";
+ private static final String SQLITE_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ private static final String SQLITE_BIGINT = "BIGINT";
+ private static final String SQLITE_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String SQLITE_DECIMAL = "DECIMAL";
+ private static final String SQLITE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String SQLITE_FLOAT = "FLOAT";
+ private static final String SQLITE_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String SQLITE_DOUBLE = "DOUBLE";
+ private static final String SQLITE_DOUBLE_PRECISION = "DOUBLE PRECISION";
+ private static final String SQLITE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+ private static final String SQLITE_NUMERIC = "NUMERIC";
+ private static final String SQLITE_REAL = "REAL";
+
+ // -------------------------text----------------------------
+ private static final String SQLITE_CHAR = "CHAR";
+ private static final String SQLITE_CHARACTER = "CHARACTER";
+ private static final String SQLITE_VARYING_CHARACTER = "VARYING_CHARACTER";
+ private static final String SQLITE_NATIVE_CHARACTER = "NATIVE_CHARACTER";
+ private static final String SQLITE_NCHAR = "NCHAR";
+ private static final String SQLITE_VARCHAR = "VARCHAR";
+ private static final String SQLITE_LONGVARCHAR = "LONGVARCHAR";
+ private static final String SQLITE_LONGNVARCHAR = "LONGNVARCHAR";
+ private static final String SQLITE_NVARCHAR = "NVARCHAR";
+ private static final String SQLITE_TINYTEXT = "TINYTEXT";
+ private static final String SQLITE_MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String SQLITE_TEXT = "TEXT";
+ private static final String SQLITE_LONGTEXT = "LONGTEXT";
+ private static final String SQLITE_JSON = "JSON";
+ private static final String SQLITE_CLOB = "CLOB";
+
+ // ------------------------------time(text)-------------------------
+ private static final String SQLITE_DATE = "DATE";
+ private static final String SQLITE_DATETIME = "DATETIME";
+ private static final String SQLITE_TIME = "TIME";
+ private static final String SQLITE_TIMESTAMP = "TIMESTAMP";
+
+ // ------------------------------blob-------------------------
+ private static final String SQLITE_TINYBLOB = "TINYBLOB";
+ private static final String SQLITE_MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String SQLITE_BLOB = "BLOB";
+ private static final String SQLITE_LONGBLOB = "LONGBLOB";
+ private static final String SQLITE_BINARY = "BINARY";
+ private static final String SQLITE_VARBINARY = "VARBINARY";
+ private static final String SQLITE_LONGVARBINARY = "LONGVARBINARY";
+
+ @Override
+ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int
colIndex) throws SQLException {
+ String columnTypeName =
metadata.getColumnTypeName(colIndex).toUpperCase().trim();
+ switch (columnTypeName) {
+ case SQLITE_BIT:
+ case SQLITE_BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case SQLITE_TINYINT:
+ case SQLITE_TINYINT_UNSIGNED:
+ case SQLITE_SMALLINT:
+ case SQLITE_SMALLINT_UNSIGNED:
+ return BasicType.SHORT_TYPE;
+ case SQLITE_MEDIUMINT:
+ case SQLITE_MEDIUMINT_UNSIGNED:
+ case SQLITE_INT:
+ case SQLITE_INTEGER:
+ return BasicType.INT_TYPE;
+ case SQLITE_INT_UNSIGNED:
+ case SQLITE_INTEGER_UNSIGNED:
+ case SQLITE_BIGINT:
+ case SQLITE_BIGINT_UNSIGNED:
+ case SQLITE_NUMERIC:
+ return BasicType.LONG_TYPE;
+ case SQLITE_DECIMAL:
+ case SQLITE_DECIMAL_UNSIGNED:
+ case SQLITE_DOUBLE:
+ case SQLITE_DOUBLE_PRECISION:
+ case SQLITE_REAL:
+ return BasicType.DOUBLE_TYPE;
+ case SQLITE_FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case SQLITE_FLOAT_UNSIGNED:
+ log.warn("{} will probably cause value overflow.",
SQLITE_FLOAT_UNSIGNED);
+ return BasicType.FLOAT_TYPE;
+ case SQLITE_DOUBLE_UNSIGNED:
+ log.warn("{} will probably cause value overflow.",
SQLITE_DOUBLE_UNSIGNED);
+ return BasicType.DOUBLE_TYPE;
+ case SQLITE_CHARACTER:
+ case SQLITE_VARYING_CHARACTER:
+ case SQLITE_NATIVE_CHARACTER:
+ case SQLITE_NVARCHAR:
+ case SQLITE_NCHAR:
+ case SQLITE_LONGNVARCHAR:
+ case SQLITE_LONGVARCHAR:
+ case SQLITE_CLOB:
+ case SQLITE_CHAR:
+ case SQLITE_TINYTEXT:
+ case SQLITE_MEDIUMTEXT:
+ case SQLITE_TEXT:
+ case SQLITE_VARCHAR:
+ case SQLITE_JSON:
+ case SQLITE_LONGTEXT:
+
+ case SQLITE_DATE:
+ case SQLITE_TIME:
+ case SQLITE_DATETIME:
+ case SQLITE_TIMESTAMP:
+ return BasicType.STRING_TYPE;
+
+ case SQLITE_TINYBLOB:
+ case SQLITE_MEDIUMBLOB:
+ case SQLITE_BLOB:
+ case SQLITE_LONGBLOB:
+ case SQLITE_VARBINARY:
+ case SQLITE_BINARY:
+ case SQLITE_LONGVARBINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+
+ //Doesn't support yet
+ case SQLITE_UNKNOWN:
+ default:
+ final String jdbcColumnName = metadata.getColumnName(colIndex);
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support SQLite type '%s' on column
'%s' yet.",
+ columnTypeName, jdbcColumnName));
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
index 6487c7140..c10cbf96e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
@@ -39,6 +39,7 @@ public class JdbcConnectionOptions
public int maxRetries = DEFAULT_MAX_RETRIES;
public String username;
public String password;
+ public String query;
public boolean autoCommit = JdbcConfig.AUTO_COMMIT.defaultValue();
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index f6220d9b0..c2e7bad81 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -108,6 +108,11 @@
<artifactId>mssql-jdbc</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.xerial</groupId>
+ <artifactId>sqlite-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java
new file mode 100644
index 000000000..4206134c2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+ private String tmpdir;
+ private Config config;
+ private static final List<List<Object>> TEST_DATASET =
generateTestDataset();
+ private static final String THIRD_PARTY_PLUGINS_URL =
"https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+ private void initTestDb() throws Exception {
+ URI resource =
Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+ config = new ConfigBuilder(Paths.get(resource)).getConfig();
+ CheckConfigUtil.checkAllExists(this.config, "source_table",
"sink_table", "type_source_table",
+ "type_sink_table", "insert_type_source_table_sql",
"check_type_sink_table_sql");
+ tmpdir = Paths.get(System.getProperty("java.io.tmpdir")).toString();
+ Connection connection = null;
+ try {
+ Class.forName("org.sqlite.JDBC");
+ connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir +
"/test.db", "", "");
+ Statement statement = connection.createStatement();
+ statement.execute("drop table if exists source");
+ statement.execute("drop table if exists sink");
+ statement.execute("drop table if exists type_source_table");
+ statement.execute("drop table if exists type_sink_table");
+ statement.execute(config.getString("source_table"));
+ statement.execute(config.getString("sink_table"));
+ statement.execute(config.getString("type_source_table"));
+ statement.execute(config.getString("type_sink_table"));
+
statement.execute(config.getString("insert_type_source_table_sql"));
+
+ String sql = "insert into source(age, name) values(?, ?)";
+ connection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ for (List<Object> row : TEST_DATASET) {
+ preparedStatement.setInt(1, (Integer) row.get(0));
+ preparedStatement.setString(2, (String) row.get(1));
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ }
+ connection.commit();
+ } catch (Exception e) {
+ if (null != connection) {
+ try {
+ connection.rollback();
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ throw e;
+ }
+ }
+
+ private static List<List<Object>> generateTestDataset() {
+ List<List<Object>> rows = new ArrayList<>();
+ for (int i = 1; i <= 100; i++) {
+ rows.add(Arrays.asList(i, String.format("test_%s", i)));
+ }
+ return rows;
+ }
+
+ @Test
+ public void testJdbcSqliteSourceAndSinkDataType() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
taskManager.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), new
File(tmpdir + "/test.db").toPath().toString());
+ checkSinkDataTypeTable();
+ }
+
+ private void checkSinkDataTypeTable() throws Exception {
+ URI resource =
Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+ config = new ConfigBuilder(Paths.get(resource)).getConfig();
+ CheckConfigUtil.checkAllExists(this.config, "source_table",
"sink_table", "type_source_table",
+ "type_sink_table", "insert_type_source_table_sql",
"check_type_sink_table_sql");
+
+ try (Connection connection =
DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(config.getString("check_type_sink_table_sql"));
+ resultSet.next();
+ Assertions.assertEquals(resultSet.getInt(1), 2);
+ }
+ }
+
+ @Test
+ public void testJdbcSqliteSourceAndSink() throws IOException,
InterruptedException, SQLException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
taskManager.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), new
File(tmpdir + "/test.db").toPath().toString());
+ // query result
+ String sql = "select age, name from sink order by age asc";
+ List<List<Object>> result = new ArrayList<>();
+ try (Connection connection =
DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ result.add(Arrays.asList(
+ resultSet.getInt(1),
+ resultSet.getString(2)));
+ }
+ Assertions.assertIterableEquals(TEST_DATASET, result);
+ }
+ }
+
+ @AfterEach
+ public void closeResource() throws SQLException, IOException {
+ // remove the temp test.db file
+ Files.deleteIfExists(new File(tmpdir + "/test.db").toPath());
+ }
+
+ @Override
+ protected void executeExtraCommands(GenericContainer<?> container) throws
IOException, InterruptedException {
+ Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+
+ Container.ExecResult mkdirCommands1 =
jobManager.execInContainer("bash", "-c", "mkdir -p " + "/sqlite");
+ Assertions.assertEquals(0, mkdirCommands1.getExitCode());
+ Container.ExecResult mkdirCommands2 =
taskManager.execInContainer("bash", "-c", "mkdir -p " + "/sqlite");
+ Assertions.assertEquals(0, mkdirCommands2.getExitCode());
+ jobManager.execInContainer("bash", "-c", "chmod 777 -R /sqlite");
+ taskManager.execInContainer("bash", "-c", "chmod 777 -R /sqlite");
+ try {
+ initTestDb();
+ // copy db file to container, dist file path in container is
/tmp/seatunnel/data/test.db
+ jobManager.copyFileToContainer(MountableFile.forHostPath(tmpdir +
"/test.db"), "/sqlite/test.db");
+ taskManager.copyFileToContainer(MountableFile.forHostPath(tmpdir +
"/test.db"), "/sqlite/test.db");
+ jobManager.execInContainer("bash", "-c", "chmod 777
/sqlite/test.db");
+ taskManager.execInContainer("bash", "-c", "chmod 777
/sqlite/test.db");
+ } catch (Exception e) {
+ log.error("init test.db and copy test.db to container error", e);
+ Files.deleteIfExists(new File(tmpdir + "/test.db").toPath());
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
new file mode 100644
index 000000000..aeef3d5fa
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source_table = """ CREATE TABLE source ( `id` INTEGER PRIMARY KEY
AUTOINCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+sink_table = """ CREATE TABLE sink ( `id` INTEGER PRIMARY KEY AUTOINCREMENT,
`name` CHAR ( 10 ), `age` INT ) """
+
+type_source_table = """
+CREATE TABLE `type_source_table` (
+ `binary` BINARY ( 64 ) DEFAULT NULL,
+ `blob` BLOB,
+ `long_varbinary` MEDIUMBLOB,
+ `longblob` LONGBLOB,
+ `tinyblob` TINYBLOB,
+ `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+ `tinyint` TINYINT DEFAULT NULL,
+ `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+ `smallint` SMALLINT DEFAULT NULL,
+ `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+ `mediumint` MEDIUMINT DEFAULT NULL,
+ `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+ `int` INT DEFAULT NULL,
+ `int_unsigned` INT UNSIGNED DEFAULT NULL,
+ `integer` INT DEFAULT NULL,
+ `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+ `bigint` BIGINT DEFAULT NULL,
+ `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+ `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `float` FLOAT DEFAULT NULL,
+ `double` DOUBLE DEFAULT NULL,
+ `double_precision` DOUBLE DEFAULT NULL,
+
+ `longtext` LONGTEXT,
+ `mediumtext` MEDIUMTEXT,
+ `text` text,
+ `tinytext` TINYTEXT,
+ `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+ `json` json DEFAULT NULL,
+
+ `date` date DEFAULT NULL,
+ `datetime` datetime DEFAULT NULL,
+ `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+type_sink_table = """
+CREATE TABLE `type_sink_table` (
+ `binary` BINARY ( 64 ) DEFAULT NULL,
+ `blob` BLOB,
+ `long_varbinary` MEDIUMBLOB,
+ `longblob` LONGBLOB,
+ `tinyblob` TINYBLOB,
+ `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+ `tinyint` TINYINT DEFAULT NULL,
+ `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+ `smallint` SMALLINT DEFAULT NULL,
+ `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+ `mediumint` MEDIUMINT DEFAULT NULL,
+ `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+ `int` INT DEFAULT NULL,
+ `int_unsigned` INT UNSIGNED DEFAULT NULL,
+ `integer` INT DEFAULT NULL,
+ `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+ `bigint` BIGINT DEFAULT NULL,
+ `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+ `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `float` FLOAT DEFAULT NULL,
+ `double` DOUBLE DEFAULT NULL,
+ `double_precision` DOUBLE DEFAULT NULL,
+
+ `longtext` LONGTEXT,
+ `mediumtext` MEDIUMTEXT,
+ `text` text,
+ `tinytext` TINYTEXT,
+ `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+ `json` json DEFAULT NULL,
+
+ `date` date DEFAULT NULL,
+ `datetime` datetime DEFAULT NULL,
+ `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+insert_type_source_table_sql = """
+INSERT INTO `type_source_table` (
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `timestamp`
+)
+VALUES
+ (
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 'a',
+ 'a',
+ 'a',
+ 'a',
+ 'a',
+ '{}',
+ '2022-09-22',
+ '2022-09-22 15:07:44',
+ '2022-09-22 15:07:55'
+ )
+"""
+
+check_type_sink_table_sql = """
+SELECT
+ count( 1 )
+FROM
+ ( SELECT * FROM type_source_table UNION ALL SELECT * FROM
type_sink_table ) a
+GROUP BY
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `timestamp`
+"""
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
new file mode 100644
index 000000000..a3273d98d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ driver = org.sqlite.JDBC
+ url = "jdbc:sqlite:/sqlite/test.db"
+ user = ""
+ password = ""
+ query = "select age, name from source"
+ }
+}
+
+transform {
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Jdbc {
+ driver = org.sqlite.JDBC
+ url = "jdbc:sqlite:/sqlite/test.db"
+ user = ""
+ password = ""
+ query = "insert into sink(age, name) values(?, ?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
new file mode 100644
index 000000000..68525fc8c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:sqlite:/sqlite/test.db"
+ driver = org.sqlite.JDBC
+ user = ""
+ password = ""
+ query = """ SELECT
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `timestamp`
+
+ FROM `type_source_table`
+ """
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:sqlite:/sqlite/test.db"
+ driver = org.sqlite.JDBC
+ user = ""
+ password = ""
+ query = """ INSERT INTO `type_sink_table` (
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `timestamp`
+ )
+ VALUES
+ (
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?
+ )"""
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index ee7629f14..8cd754f05 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -96,6 +96,11 @@
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.xerial</groupId>
+ <artifactId>sqlite-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java
new file mode 100644
index 000000000..c9d2815dc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+ private String tmpdir;
+ private Config config;
+ private static final List<List<Object>> TEST_DATASET =
generateTestDataset();
+ private static final String THIRD_PARTY_PLUGINS_URL =
"https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+ private void initTestDb() throws Exception {
+ URI resource =
Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+ config = new ConfigBuilder(Paths.get(resource)).getConfig();
+ CheckConfigUtil.checkAllExists(this.config, "source_table",
"sink_table", "type_source_table",
+ "type_sink_table", "insert_type_source_table_sql",
"check_type_sink_table_sql");
+ tmpdir = Paths.get(System.getProperty("java.io.tmpdir")).toString();
+ Connection connection = null;
+ try {
+ Class.forName("org.sqlite.JDBC");
+ connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir +
"/test.db", "", "");
+ Statement statement = connection.createStatement();
+ statement.execute("drop table if exists source");
+ statement.execute("drop table if exists sink");
+ statement.execute("drop table if exists type_source_table");
+ statement.execute("drop table if exists type_sink_table");
+ statement.execute(config.getString("source_table"));
+ statement.execute(config.getString("sink_table"));
+ statement.execute(config.getString("type_source_table"));
+ statement.execute(config.getString("type_sink_table"));
+
statement.execute(config.getString("insert_type_source_table_sql"));
+
+ String sql = "insert into source(age, name) values(?, ?)";
+ connection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ for (List<Object> row : TEST_DATASET) {
+ preparedStatement.setInt(1, (Integer) row.get(0));
+ preparedStatement.setString(2, (String) row.get(1));
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ }
+ connection.commit();
+ } catch (Exception e) {
+ if (null != connection) {
+ try {
+ connection.rollback();
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ throw e;
+ }
+ }
+
+ private static List<List<Object>> generateTestDataset() {
+ List<List<Object>> rows = new ArrayList<>();
+ for (int i = 1; i <= 100; i++) {
+ rows.add(Arrays.asList(i, String.format("test_%s", i)));
+ }
+ return rows;
+ }
+
+ @Test
+ public void testJdbcSqliteSourceAndSinkDataType() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ master.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(),
new File(tmpdir + "/test.db").toPath().toString());
+ checkSinkDataTypeTable();
+ }
+
+ private void checkSinkDataTypeTable() throws Exception {
+ URI resource =
Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+ config = new ConfigBuilder(Paths.get(resource)).getConfig();
+ CheckConfigUtil.checkAllExists(this.config, "source_table",
"sink_table", "type_source_table",
+ "type_sink_table", "insert_type_source_table_sql",
"check_type_sink_table_sql");
+
+ try (Connection connection =
DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(config.getString("check_type_sink_table_sql"));
+ resultSet.next();
+ Assertions.assertEquals(resultSet.getInt(1), 2);
+ }
+ }
+
+ @Test
+ public void testJdbcSqliteSourceAndSink() throws IOException,
InterruptedException, SQLException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ master.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(),
new File(tmpdir + "/test.db").toPath().toString());
+ // query result
+ String sql = "select age, name from sink order by age asc";
+ List<List<Object>> result = new ArrayList<>();
+ try (Connection connection =
DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ result.add(Arrays.asList(
+ resultSet.getInt(1),
+ resultSet.getString(2)));
+ }
+ Assertions.assertIterableEquals(TEST_DATASET, result);
+ }
+ }
+
+ @AfterEach
+ public void closeResource() throws SQLException, IOException {
+ // remove the temp test.db
+ Files.deleteIfExists(new File(tmpdir + "/test.db").toPath());
+ }
+
+ @Override
+ protected void executeExtraCommands(GenericContainer<?> container) throws
IOException, InterruptedException {
+ Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+
+ Container.ExecResult mkdirCommands = container.execInContainer("bash",
"-c", "mkdir -p " + "/sqlite");
+ Assertions.assertEquals(0, mkdirCommands.getExitCode());
+
+ Container.ExecResult chmodCommands = container.execInContainer("bash",
"-c", "chmod 777 -R /sqlite");
+ Assertions.assertEquals(0, chmodCommands.getExitCode());
+
+ try {
+ initTestDb();
+ // copy db file to container, dist file path in container is
/tmp/seatunnel/data/test.db
+ container.copyFileToContainer(MountableFile.forHostPath(tmpdir +
"/test.db"), "/sqlite/test.db");
+ container.execInContainer("bash", "-c", "chmod 777
/sqlite/test.db");
+ } catch (Exception e) {
+ log.error("init test.db and copy test.db to container error", e);
+ Files.deleteIfExists(new File(tmpdir + "/test.db").toPath());
+ }
+ }
+
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
new file mode 100644
index 000000000..aeef3d5fa
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source_table = """ CREATE TABLE source ( `id` INTEGER PRIMARY KEY
AUTOINCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+sink_table = """ CREATE TABLE sink ( `id` INTEGER PRIMARY KEY AUTOINCREMENT,
`name` CHAR ( 10 ), `age` INT ) """
+
+type_source_table = """
+CREATE TABLE `type_source_table` (
+ `binary` BINARY ( 64 ) DEFAULT NULL,
+ `blob` BLOB,
+ `long_varbinary` MEDIUMBLOB,
+ `longblob` LONGBLOB,
+ `tinyblob` TINYBLOB,
+ `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+ `tinyint` TINYINT DEFAULT NULL,
+ `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+ `smallint` SMALLINT DEFAULT NULL,
+ `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+ `mediumint` MEDIUMINT DEFAULT NULL,
+ `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+ `int` INT DEFAULT NULL,
+ `int_unsigned` INT UNSIGNED DEFAULT NULL,
+ `integer` INT DEFAULT NULL,
+ `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+ `bigint` BIGINT DEFAULT NULL,
+ `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+ `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `float` FLOAT DEFAULT NULL,
+ `double` DOUBLE DEFAULT NULL,
+ `double_precision` DOUBLE DEFAULT NULL,
+
+ `longtext` LONGTEXT,
+ `mediumtext` MEDIUMTEXT,
+ `text` text,
+ `tinytext` TINYTEXT,
+ `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+ `json` json DEFAULT NULL,
+
+ `date` date DEFAULT NULL,
+ `datetime` datetime DEFAULT NULL,
+ `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+type_sink_table = """
+CREATE TABLE `type_sink_table` (
+ `binary` BINARY ( 64 ) DEFAULT NULL,
+ `blob` BLOB,
+ `long_varbinary` MEDIUMBLOB,
+ `longblob` LONGBLOB,
+ `tinyblob` TINYBLOB,
+ `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+ `tinyint` TINYINT DEFAULT NULL,
+ `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+ `smallint` SMALLINT DEFAULT NULL,
+ `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+ `mediumint` MEDIUMINT DEFAULT NULL,
+ `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+ `int` INT DEFAULT NULL,
+ `int_unsigned` INT UNSIGNED DEFAULT NULL,
+ `integer` INT DEFAULT NULL,
+ `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+ `bigint` BIGINT DEFAULT NULL,
+ `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+ `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `float` FLOAT DEFAULT NULL,
+ `double` DOUBLE DEFAULT NULL,
+ `double_precision` DOUBLE DEFAULT NULL,
+
+ `longtext` LONGTEXT,
+ `mediumtext` MEDIUMTEXT,
+ `text` text,
+ `tinytext` TINYTEXT,
+ `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+ `json` json DEFAULT NULL,
+
+ `date` date DEFAULT NULL,
+ `datetime` datetime DEFAULT NULL,
+ `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+insert_type_source_table_sql = """
+INSERT INTO `type_source_table` (
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `timestamp`
+)
+VALUES
+ (
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 'a',
+ 'a',
+ 'a',
+ 'a',
+ 'a',
+ '{}',
+ '2022-09-22',
+ '2022-09-22 15:07:44',
+ '2022-09-22 15:07:55'
+ )
+"""
+
+check_type_sink_table_sql = """
+SELECT
+ count( 1 )
+FROM
+ ( SELECT * FROM type_source_table UNION ALL SELECT * FROM
type_sink_table ) a
+GROUP BY
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `timestamp`
+"""
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
new file mode 100644
index 000000000..d67a5677a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ driver = org.sqlite.JDBC
+ url = "jdbc:sqlite:/sqlite/test.db"
+ user = ""
+ password = ""
+ query = "select age, name from source"
+ }
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ jdbc {
+ driver = org.sqlite.JDBC
+ url = "jdbc:sqlite:/sqlite/test.db"
+ user = ""
+ password = ""
+ query = "insert into sink(age, name) values(?, ?)"
+ }
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
new file mode 100644
index 000000000..68525fc8c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:sqlite:/sqlite/test.db"
+ driver = org.sqlite.JDBC
+ user = ""
+ password = ""
+ query = """ SELECT
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `timestamp`
+
+ FROM `type_source_table`
+ """
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:sqlite:/sqlite/test.db"
+ driver = org.sqlite.JDBC
+ user = ""
+ password = ""
+ query = """ INSERT INTO `type_sink_table` (
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `timestamp`
+ )
+ VALUES
+ (
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?
+ )"""
+ }
+}
\ No newline at end of file