This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 3ecd0fd [FLINK-14102] Introduce DB2Dialect
3ecd0fd is described below
commit 3ecd0fdacdc9d0a66b6de04182132eef821be401
Author: gongzhongqiang <[email protected]>
AuthorDate: Fri Dec 16 17:16:54 2022 +0800
[FLINK-14102] Introduce DB2Dialect
---
docs/content.zh/docs/connectors/table/jdbc.md | 65 +++++++--
docs/content/docs/connectors/table/jdbc.md | 56 ++++++--
flink-connector-jdbc/pom.xml | 17 ++-
.../jdbc/databases/db2/dialect/Db2Dialect.java | 155 +++++++++++++++++++++
.../databases/db2/dialect/Db2DialectFactory.java | 37 +++++
.../databases/db2/dialect/Db2RowConverter.java | 46 ++++++
...flink.connector.jdbc.dialect.JdbcDialectFactory | 1 +
.../connector/jdbc/databases/db2/Db2TestBase.java | 35 +++++
.../databases/db2/dialect/Db2DialectTypeTest.java | 58 ++++++++
.../db2/dialect/Db2PreparedStatementTest.java | 87 ++++++++++++
.../db2/table/Db2DynamicTableSinkITCase.java | 50 +++++++
.../db2/table/Db2DynamicTableSourceITCase.java | 113 +++++++++++++++
.../db2/xa/Db2ExactlyOnceSinkE2eTest.java | 25 ++++
.../jdbc/internal/JdbcTableOutputFormatTest.java | 3 -
.../jdbc/testutils/databases/db2/Db2Database.java | 69 +++++++++
.../jdbc/testutils/databases/db2/Db2Metadata.java | 91 ++++++++++++
16 files changed, 878 insertions(+), 30 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/jdbc.md
b/docs/content.zh/docs/connectors/table/jdbc.md
index e04c1f3..647342c 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -56,6 +56,7 @@ JDBC 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref "
| Derby | `org.apache.derby` | `derby` |
[下载](http://db.apache.org/derby/derby_downloads.html) | |
| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` |
[下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16)
|
| CrateDB | `io.crate` | `crate-jdbc` |
[下载](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
+| Db2 | `com.ibm.db2.jcc` | `db2jcc` |
[下载](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows)
|
当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅[这里]({{< ref
"docs/dev/configuration/overview" >}})了解在集群上执行时如何连接它们。
@@ -406,6 +407,13 @@ lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性
WHEN NOT MATCHED THEN INSERT (..) <br>
VALUES (..)</td>
</tr>
+ <tr>
+ <td>Db2</td>
+ <td>MERGE INTO .. AS TARGET USING TABLE (VALUES (..)) AS SOURCE
(..) ON (..) <br>
+ WHEN MATCHED THEN UPDATE SET .. <br>
+ WHEN NOT MATCHED THEN INSERT (..) <br>
+ VALUES (..)</td>
+ </tr>
</tbody>
</table>
@@ -550,11 +558,11 @@ catalogs:
因此,Flink Catalog 和 Postgres 之间的元空间映射如下:
-| Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
-| :------------------------------------| :-----------------------------------|
-| catalog name (defined in Flink only) | N/A |
-| database name | database name |
-| table name | [schema_name.]table_name |
+| Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
+|:-------------------------------------|:-----------------------------|
+| catalog name (defined in Flink only) | N/A |
+| database name | database name |
+| table name | [schema_name.]table_name |
Flink 中的 Postgres 表的完整路径应该是 ``"<catalog>.<db>.`<schema.table>`"``。如果指定了
schema,请注意需要转义 `<schema.table>`。
@@ -585,11 +593,11 @@ MySQL 实例中的数据库与 MySQL Catalog 注册的 catalog 下的数据库
因此,Flink Catalog 和 MySQL catalog 之间的元空间映射如下:
-| Flink Catalog Metaspace Structure | MySQL Metaspace Structure |
-| :------------------------------------| :-----------------------------------|
-| catalog name (defined in Flink only) | N/A |
-| database name | database name |
-| table name | table_name |
+| Flink Catalog Metaspace Structure | MySQL Metaspace Structure |
+|:-------------------------------------|:--------------------------|
+| catalog name (defined in Flink only) | N/A |
+| database name | database name |
+| table name | table_name |
Flink 中的 MySQL 表的完整路径应该是 ``"`<catalog>`.`<db>`.`<table>`"``。
@@ -644,7 +652,7 @@ SELECT * FROM `custom_schema.test_table2`;
数据类型映射
----------------
-Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、CrateDB, Derby
等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义
JDBC 表更加简单。
+Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、CrateDB, Derby、Db2、
SQL Server 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在
Flink 中定义 JDBC 表更加简单。
<table class="table table-bordered">
<thead>
@@ -654,6 +662,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<th class="text-left"><a
href="https://www.postgresql.org/docs/12/datatype.html">PostgreSQL type</a></th>
<th class="text-left"><a
href="https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html">CrateDB
type</a></th>
<th class="text-left"><a
href="https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16">SQL
Server type</a></th>
+ <th class="text-left"><a
href="https://www.ibm.com/docs/en/db2-for-zos/12?topic=columns-data-types">Db2</a></th>
<th class="text-left"><a href="{{< ref "docs/dev/table/types"
>}}">Flink SQL type</a></th>
</tr>
</thead>
@@ -664,6 +673,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<td></td>
<td></td>
<td><code>TINYINT</code></td>
+ <td></td>
<td><code>TINYINT</code></td>
</tr>
<tr>
@@ -681,6 +691,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<code>SHORT</code></td>
<td><code>SMALLINT</code></td>
<td><code>SMALLINT</code></td>
+ <td><code>SMALLINT</code></td>
</tr>
<tr>
<td>
@@ -695,6 +706,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<code>INTEGER</code><br>
<code>INT</code></td>
<td><code>INT</code></td>
+ <td><code>INTEGER</code></td>
<td><code>INT</code></td>
</tr>
<tr>
@@ -709,6 +721,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<code>BIGINT</code><br>
<code>LONG</code></td>
<td><code>BIGINT</code></td>
+ <td></td>
<td><code>BIGINT</code></td>
</tr>
<tr>
@@ -716,9 +729,18 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<td></td>
<td></td>
<td></td>
- <td></td>
+ <td></td>
+ <td></td>
<td><code>DECIMAL(20, 0)</code></td>
</tr>
+ <tr>
+ <td><code>BIGINT</code></td>
+ <td></td>
+ <td><code>BIGINT</code></td>
+ <td></td>
+ <td><code>BIGINT</code></td>
+ <td><code>BIGINT</code></td>
+ </tr>
<tr>
<td><code>FLOAT</code></td>
<td>
@@ -730,6 +752,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<code>REAL</code><br>
<code>FLOAT</code></td>
<td><code>REAL</code></td>
+ <td><code>REAL</code></td>
<td><code>FLOAT</code></td>
</tr>
<tr>
@@ -745,6 +768,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<code>DOUBLE PRECISION</code></td>
<td><code>FLOAT</code></td>
<td><code>DOUBLE</code></td>
+ <td><code>DOUBLE</code></td>
</tr>
<tr>
<td>
@@ -761,6 +785,10 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<code>DECIMAL(p, s)</code></td>
<td><code>NUMERIC(p, s)</code></td>
<td><code>DECIMAL(p, s)</code></td>
+ <td>
+ <code>NUMERIC(p, s)</code>
+ <code>DECIMAL(p, s)</code>
+ </td>
<td><code>DECIMAL(p, s)</code></td>
</tr>
<tr>
@@ -772,6 +800,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<td><code>BOOLEAN</code></td>
<td><code>BIT</code></td>
<td><code>BOOLEAN</code></td>
+ <td><code>BOOLEAN</code></td>
</tr>
<tr>
<td><code>DATE</code></td>
@@ -780,6 +809,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<td><code>DATE</code> (only in expressions - not stored type)</td>
<td><code>DATE</code></td>
<td><code>DATE</code></td>
+ <td><code>DATE</code></td>
</tr>
<tr>
<td><code>TIME [(p)]</code></td>
@@ -787,6 +817,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
<td><code>TIME</code> (only in expressions - not stored type)</td>
<td><code>TIME(0)</code></td>
+ <td><code>TIME</code></td>
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
</tr>
<tr>
@@ -798,6 +829,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<code>DATETIME</code>
<code>DATETIME2</code>
</td>
+ <td><code>TIMESTAMP [(p)]</code></td>
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
</tr>
<tr>
@@ -829,6 +861,10 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<code>NVARCHAR(n)</code><br>
<code>TEXT</code><br>
<code>NTEXT</code></td>
+ <td>
+ <code>VARCHAR(n)</code><br>
+ <code>CHAR [(p)]</code>
+ </td>
<td><code>STRING</code></td>
</tr>
<tr>
@@ -843,7 +879,9 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<td></td>
<td>
<code>BINARY(n)</code><br>
- <code>VARBINARY(n)</code><br></td>
+ <code>VARBINARY(n)</code><br>
+ </td>
+ <td></td>
<td><code>BYTES</code></td>
</tr>
<tr>
@@ -852,6 +890,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
<td><code>ARRAY</code></td>
<td><code>ARRAY</code></td>
<td></td>
+ <td></td>
<td><code>ARRAY</code></td>
</tr>
</tbody>
diff --git a/docs/content/docs/connectors/table/jdbc.md
b/docs/content/docs/connectors/table/jdbc.md
index 0c5f6e9..7c94f0c 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -53,7 +53,7 @@ A driver dependency is also required to connect to a
specified database. Here ar
| Derby | `org.apache.derby` | `derby` |
[Download](http://db.apache.org/derby/derby_downloads.html) |
| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` |
[Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16)
|
| CrateDB | `io.crate` | `crate-jdbc` |
[Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
-
+| Db2 | `com.ibm.db2.jcc` | `db2jcc` |
[Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows)
|
JDBC connector and drivers are not part of Flink's binary distribution. See
how to link with them for cluster execution [here]({{< ref
"docs/dev/configuration/overview" >}}).
@@ -415,6 +415,13 @@ As there is no standard syntax for upsert, the following
table describes the dat
WHEN NOT MATCHED THEN INSERT (..) <br>
VALUES (..)</td>
</tr>
+ <tr>
+ <td>Db2</td>
+ <td>MERGE INTO .. AS TARGET USING TABLE (VALUES (..)) AS SOURCE
(..) ON (..) <br>
+ WHEN MATCHED THEN UPDATE SET .. <br>
+ WHEN NOT MATCHED THEN INSERT (..) <br>
+ VALUES (..)</td>
+ </tr>
</tbody>
</table>
@@ -551,11 +558,11 @@ In Flink, when querying tables registered by Postgres
catalog, users can use eit
Therefore, the metaspace mapping between Flink Catalog and Postgres is as
following:
-| Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
-| :------------------------------------| :-----------------------------------|
-| catalog name (defined in Flink only) | N/A |
-| database name | database name |
-| table name | [schema_name.]table_name |
+| Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
+|:-------------------------------------|:-----------------------------|
+| catalog name (defined in Flink only) | N/A |
+| database name | database name |
+| table name | [schema_name.]table_name |
The full path of Postgres table in Flink should be
``"<catalog>.<db>.`<schema.table>`"`` if schema is specified, note the
`<schema.table>` should be escaped.
@@ -583,11 +590,11 @@ In Flink, when querying tables registered by MySQL
catalog, users can use either
Therefore, the metaspace mapping between Flink Catalog and MySQL Catalog is as
following:
-| Flink Catalog Metaspace Structure | MySQL Metaspace Structure |
-| :------------------------------------| :-----------------------------------|
-| catalog name (defined in Flink only) | N/A |
-| database name | database name |
-| table name | table_name |
+| Flink Catalog Metaspace Structure | MySQL Metaspace Structure |
+|:-------------------------------------|:--------------------------|
+| catalog name (defined in Flink only) | N/A |
+| database name | database name |
+| table name | table_name |
The full path of MySQL table in Flink should be
``"`<catalog>`.`<db>`.`<table>`"``.
@@ -638,7 +645,7 @@ SELECT * FROM `custom_schema.test_table2`;
Data Type Mapping
----------------
-Flink supports connect to several databases which uses dialect like MySQL,
Oracle, PostgreSQL, CrateDB, Derby. The Derby dialect usually used for testing
purpose. The field data type mappings from relational databases data types to
Flink SQL data types are listed in the following table, the mapping table can
help define JDBC table in Flink easily.
+Flink supports connect to several databases which uses dialect like MySQL,
Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2. The Derby dialect usually
used for testing purpose. The field data type mappings from relational
databases data types to Flink SQL data types are listed in the following table,
the mapping table can help define JDBC table in Flink easily.
<table class="table table-bordered">
<thead>
@@ -648,6 +655,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<th class="text-left"><a
href="https://www.postgresql.org/docs/12/datatype.html">PostgreSQL type</a></th>
<th class="text-left"><a
href="https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html">CrateDB
type</a></th>
<th class="text-left"><a
href="https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16">SQL
Server type</a></th>
+ <th class="text-left"><a
href="https://www.ibm.com/docs/en/db2-for-zos/12?topic=columns-data-types">Db2</a></th>
<th class="text-left"><a href="{{< ref "docs/dev/table/types"
>}}">Flink SQL type</a></th>
</tr>
</thead>
@@ -658,6 +666,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<td></td>
<td></td>
<td><code>TINYINT</code></td>
+ <td></td>
<td><code>TINYINT</code></td>
</tr>
<tr>
@@ -675,6 +684,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<code>SHORT</code></td>
<td><code>SMALLINT</code></td>
<td><code>SMALLINT</code></td>
+ <td><code>SMALLINT</code></td>
</tr>
<tr>
<td>
@@ -689,6 +699,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<code>INTEGER</code><br>
<code>INT</code></td>
<td><code>INT</code></td>
+ <td><code>INTEGER</code></td>
<td><code>INT</code></td>
</tr>
<tr>
@@ -703,6 +714,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<code>BIGINT</code><br>
<code>LONG</code></td>
<td><code>BIGINT</code></td>
+ <td></td>
<td><code>BIGINT</code></td>
</tr>
<tr>
@@ -711,6 +723,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<td></td>
<td></td>
<td></td>
+ <td></td>
<td><code>DECIMAL(20, 0)</code></td>
</tr>
<tr>
@@ -724,6 +737,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<code>REAL</code><br>
<code>FLOAT</code></td>
<td><code>REAL</code></td>
+ <td><code>REAL</code></td>
<td><code>FLOAT</code></td>
</tr>
<tr>
@@ -739,6 +753,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<code>DOUBLE PRECISION</code></td>
<td><code>FLOAT</code></td>
<td><code>DOUBLE</code></td>
+ <td><code>DOUBLE</code></td>
</tr>
<tr>
<td>
@@ -755,6 +770,10 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<code>DECIMAL(p, s)</code></td>
<td><code>NUMERIC(p, s)</code></td>
<td><code>DECIMAL(p, s)</code></td>
+ <td>
+ <code>NUMERIC(p, s)</code>
+ <code>DECIMAL(p, s)</code>
+ </td>
<td><code>DECIMAL(p, s)</code></td>
</tr>
<tr>
@@ -766,6 +785,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<td><code>BOOLEAN</code></td>
<td><code>BIT</code></td>
<td><code>BOOLEAN</code></td>
+ <td><code>BOOLEAN</code></td>
</tr>
<tr>
<td><code>DATE</code></td>
@@ -774,6 +794,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<td><code>DATE</code> (only in expressions - not stored type)</td>
<td><code>DATE</code></td>
<td><code>DATE</code></td>
+ <td><code>DATE</code></td>
</tr>
<tr>
<td><code>TIME [(p)]</code></td>
@@ -781,6 +802,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
<td><code>TIME</code> (only in expressions - not stored type)</td>
<td><code>TIME(0)</code></td>
+ <td><code>TIME</code></td>
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
</tr>
<tr>
@@ -792,6 +814,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<code>DATETIME</code>
<code>DATETIME2</code>
</td>
+ <td><code>TIMESTAMP [(p)]</code></td>
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
</tr>
<tr>
@@ -823,6 +846,10 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<code>NVARCHAR(n)</code><br>
<code>TEXT</code><br>
<code>NTEXT</code></td>
+ <td>
+ <code>VARCHAR(n)</code><br>
+ <code>CHAR [(p)]</code>
+ </td>
<td><code>STRING</code></td>
</tr>
<tr>
@@ -837,7 +864,9 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<td></td>
<td>
<code>BINARY(n)</code><br>
- <code>VARBINARY(n)</code><br></td>
+ <code>VARBINARY(n)</code><br>
+ </td>
+ <td></td>
<td><code>BYTES</code></td>
</tr>
<tr>
@@ -846,6 +875,7 @@ Flink supports connect to several databases which uses
dialect like MySQL, Oracl
<td><code>ARRAY</code></td>
<td><code>ARRAY</code></td>
<td></td>
+ <td></td>
<td><code>ARRAY</code></td>
</tr>
</tbody>
diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index 0a298e8..56c2df3 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -92,6 +92,14 @@ under the License.
</exclusions>
</dependency>
+ <!-- Db2 -->
+ <dependency>
+ <groupId>com.ibm.db2.jcc</groupId>
+ <artifactId>db2jcc</artifactId>
+ <version>db2jcc4</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Tests -->
<dependency>
@@ -206,13 +214,20 @@ under the License.
<scope>test</scope>
</dependency>
- <!-- ArchUit test dependencies -->
+ <!-- Db2 test -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>db2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- ArchUit test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-architecture-tests-test</artifactId>
<scope>test</scope>
</dependency>
+
</dependencies>
<dependencyManagement>
diff --git
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
new file mode 100644
index 0000000..e617a9f
--- /dev/null
+++
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.dialect;
+
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for IBM Db2. */
+public class Db2Dialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int MAX_TIMESTAMP_PRECISION = 9;
+ private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+ private static final int MAX_DECIMAL_PRECISION = 31;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new Db2RowConverter(rowType);
+ }
+
+ @Override
+ public Optional<String> defaultDriverName() {
+ return Optional.of("com.ibm.db2.jcc.DB2Driver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return identifier;
+ }
+
+ @Override
+ public String dialectName() {
+ return "Db2";
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return String.format("FETCH FIRST %d ROWS ONLY", limit);
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ List<String> nonUniqueKeyFields =
+ Arrays.stream(fieldNames)
+ .filter(f ->
!Arrays.asList(uniqueKeyFields).contains(f))
+ .collect(Collectors.toList());
+ String fieldsProjection =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+
+ String valuesBinding =
+ Arrays.stream(fieldNames).map(f -> ":" +
f).collect(Collectors.joining(", "));
+
+ String columnBinding =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+
+ String onConditions =
+ Arrays.stream(uniqueKeyFields)
+ .map(f -> "TARGET." + quoteIdentifier(f) + "= SOURCE."
+ quoteIdentifier(f))
+ .collect(Collectors.joining(" AND "));
+ String updateSetClause =
+ nonUniqueKeyFields.stream()
+ .map(f -> "TARGET." + quoteIdentifier(f) + "= SOURCE."
+ quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+
+ String insertValues =
+ Arrays.stream(fieldNames)
+ .map(f -> "SOURCE." + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+
+ Optional<String> format =
+ Optional.of(
+ String.format(
+ "MERGE INTO %s AS TARGET"
+ + " USING TABLE (VALUES ( %s )) AS
SOURCE ( %s )"
+ + " ON (%s)"
+ + " WHEN MATCHED THEN"
+ + " UPDATE SET %s"
+ + " WHEN NOT MATCHED THEN"
+ + " INSERT (%s) VALUES (%s);",
+ quoteIdentifier(tableName),
+ valuesBinding,
+ columnBinding,
+ onConditions,
+ updateSetClause,
+ fieldsProjection,
+ insertValues));
+
+ return format;
+ }
+
+ @Override
+ public Optional<Range> decimalPrecisionRange() {
+ return Optional.of(Range.of(MIN_DECIMAL_PRECISION,
MAX_DECIMAL_PRECISION));
+ }
+
+ @Override
+ public Optional<Range> timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION,
MAX_TIMESTAMP_PRECISION));
+ }
+
+ @Override
+ public Set<LogicalTypeRoot> supportedTypes() {
+ // The data types used in Db2 are list at
+ // https://www.ibm.com/docs/en/db2-for-zos/12?topic=columns-data-types
+
+ return EnumSet.of(
+ LogicalTypeRoot.CHAR,
+ LogicalTypeRoot.VARCHAR,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.VARBINARY,
+ LogicalTypeRoot.DECIMAL,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ }
+}
diff --git
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
new file mode 100644
index 0000000..c3a2182
--- /dev/null
+++
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link Db2Dialect}. */
+@Internal
+public class Db2DialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:db2:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new Db2Dialect();
+ }
+}
diff --git
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java
new file mode 100644
index 0000000..8557c82
--- /dev/null
+++
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.dialect;
+
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink
internal object for
+ * Db2.
+ */
+public class Db2RowConverter extends AbstractJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String converterName() {
+ return "Db2";
+ }
+
+ public Db2RowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ protected JdbcDeserializationConverter createInternalConverter(LogicalType
type) {
+ return super.createInternalConverter(type);
+ }
+}
diff --git
a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
index e5a05b7..b9b9e11 100644
---
a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
+++
b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
@@ -19,3 +19,4 @@
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialectFactor
org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialectFactory
org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFactory
org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory
+org.apache.flink.connector.jdbc.databases.db2.dialect.Db2DialectFactory
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/Db2TestBase.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/Db2TestBase.java
new file mode 100644
index 0000000..280828b
--- /dev/null
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/Db2TestBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+import org.apache.flink.connector.jdbc.testutils.databases.db2.Db2Database;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Base class for Db2 testing. */
+@ExtendWith(Db2Database.class)
+public interface Db2TestBase extends DatabaseTest {
+
+ @Override
+ default DatabaseMetadata getMetadata() {
+ return Db2Database.getMetadata();
+ }
+}
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectTypeTest.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectTypeTest.java
new file mode 100644
index 0000000..4f687a6
--- /dev/null
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectTypeTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** The Db2 params for {@link JdbcDialectTypeTest}. */
+public class Db2DialectTypeTest extends JdbcDialectTypeTest {
+
+ @Override
+ protected String testDialect() {
+ return "db2";
+ }
+
+ @Override
+ protected List<TestItem> testData() {
+ return Arrays.asList(
+ createTestItem("CHAR"),
+ createTestItem("VARCHAR"),
+ createTestItem("BOOLEAN"),
+ createTestItem("TINYINT"),
+ createTestItem("SMALLINT"),
+ createTestItem("INTEGER"),
+ createTestItem("BIGINT"),
+ createTestItem("FLOAT"),
+ createTestItem("DOUBLE"),
+ createTestItem("DECIMAL(10, 4)"),
+ createTestItem("DECIMAL(31, 18)"),
+ createTestItem("DATE"),
+ createTestItem("TIME"),
+ createTestItem("TIMESTAMP(3)"),
+ createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
+
+ // Not valid data
+ createTestItem("BINARY", "The Db2 dialect doesn't support
type: BINARY(1)."),
+ createTestItem(
+ "VARBINARY(10)", "The Db2 dialect doesn't support
type: VARBINARY(10)."));
+ }
+}
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java
new file mode 100644
index 0000000..162c3bc
--- /dev/null
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link Db2PreparedStatementTest}. */
+class Db2PreparedStatementTest {
+
+ private final JdbcDialect dialect =
+ JdbcDialectLoader.load("jdbc:db2://localhost:3306/test",
getClass().getClassLoader());
+ private final String[] fieldNames =
+ new String[] {"id", "name", "email", "ts", "field1", "field_2",
"__field_3__"};
+ private final String[] keyFields = new String[] {"id", "__field_3__"};
+ private final String tableName = "tbl";
+
+ @Test
+ void testInsertStatement() {
+ String insertStmt = dialect.getInsertIntoStatement(tableName,
fieldNames);
+ assertThat(insertStmt)
+ .isEqualTo(
+ "INSERT INTO tbl(id, name, email, ts, field1, field_2,
__field_3__) "
+ + "VALUES (:id, :name, :email, :ts, :field1,
:field_2, :__field_3__)");
+ }
+
+ @Test
+ void testDeleteStatement() {
+ String deleteStmt = dialect.getDeleteStatement(tableName, keyFields);
+ assertThat(deleteStmt)
+ .isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ =
:__field_3__");
+ }
+
+ @Test
+ void testRowExistsStatement() {
+ String rowExistStmt = dialect.getRowExistsStatement(tableName,
keyFields);
+ assertThat(rowExistStmt)
+ .isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ =
:__field_3__");
+ }
+
+ @Test
+ void testUpdateStatement() {
+ String updateStmt = dialect.getUpdateStatement(tableName, fieldNames,
keyFields);
+ assertThat(updateStmt)
+ .isEqualTo(
+ "UPDATE tbl SET id = :id, name = :name, email =
:email, ts = :ts, "
+ + "field1 = :field1, field_2 = :field_2,
__field_3__ = :__field_3__ "
+ + "WHERE id = :id AND __field_3__ =
:__field_3__");
+ }
+
+ @Test
+ void testUpsertStatement() {
+ String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames,
keyFields).get();
+ assertThat(upsertStmt)
+ .isEqualTo(
+ "MERGE INTO tbl AS TARGET USING TABLE (VALUES ( :id,
:name, :email, :ts, :field1, :field_2, :__field_3__ )) AS SOURCE ( id, name,
email, ts, field1, field_2, __field_3__ ) ON (TARGET.id= SOURCE.id AND
TARGET.__field_3__= SOURCE.__field_3__) WHEN MATCHED THEN UPDATE SET
TARGET.name= SOURCE.name, TARGET.email= SOURCE.email, TARGET.ts= SOURCE.ts,
TARGET.field1= SOURCE.field1, TARGET.field_2= SOURCE.field_2 WHEN NOT MATCHED
THEN INSERT (id, name, email, ts, field1, fi [...]
+ }
+
+ @Test
+ void testSelectStatement() {
+ String selectStmt = dialect.getSelectFromStatement(tableName,
fieldNames, keyFields);
+ assertThat(selectStmt)
+ .isEqualTo(
+ "SELECT id, name, email, ts, field1, field_2,
__field_3__ FROM tbl "
+ + "WHERE id = :id AND __field_3__ =
:__field_3__");
+ }
+}
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSinkITCase.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSinkITCase.java
new file mode 100644
index 0000000..f7e3eb0
--- /dev/null
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSinkITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.table;
+
+import org.apache.flink.connector.jdbc.databases.db2.Db2TestBase;
+import org.apache.flink.connector.jdbc.databases.db2.dialect.Db2Dialect;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+
+import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
+
+/** The Table Sink ITCase for {@link Db2Dialect}. */
+public class Db2DynamicTableSinkITCase extends JdbcDynamicTableSinkITCase
implements Db2TestBase {
+ @Override
+ protected TableRow createUpsertOutputTable() {
+ return tableRow(
+ "dynamicSinkForUpsert",
+ pkField("cnt", dbType("BIGINT NOT NULL DEFAULT 0"),
DataTypes.BIGINT().notNull()),
+ field("lencnt", dbType("BIGINT NOT NULL DEFAULT 0"),
DataTypes.BIGINT().notNull()),
+ pkField("cTag", dbType("BIGINT DEFAULT 0 NOT NULL"),
DataTypes.INT().notNull()),
+ field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
+ }
+
+ @Override
+ protected TableRow createCheckpointOutputTable() {
+ return tableRow(
+ "checkpointTable",
+ field("id", dbType("BIGINT DEFAULT 0 NOT NULL"),
DataTypes.BIGINT().notNull()));
+ }
+}
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSourceITCase.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSourceITCase.java
new file mode 100644
index 0000000..d92217a
--- /dev/null
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSourceITCase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.table;
+
+import org.apache.flink.connector.jdbc.databases.db2.Db2TestBase;
+import org.apache.flink.connector.jdbc.databases.db2.dialect.Db2Dialect;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
+
+/** The Table Source ITCase for {@link Db2Dialect}. */
+public class Db2DynamicTableSourceITCase extends JdbcDynamicTableSourceITCase
+ implements Db2TestBase {
+
+ @Override
+ protected TableRow createInputTable() {
+ return tableRow(
+ "jdbDynamicTableSource",
+ field("id", dbType("BIGINT"), DataTypes.BIGINT().notNull()),
+ field("decimal_col", dbType("NUMERIC(10, 4)"),
DataTypes.DECIMAL(10, 4)),
+ field("timestamp6_col", dbType("TIMESTAMP(6)"),
DataTypes.TIMESTAMP(6)),
+ // other fields are covered by the base class
+ field("boolean_c", dbType("BOOLEAN"), DataTypes.BOOLEAN()),
+ field("small_c", dbType("SMALLINT"), DataTypes.SMALLINT()),
+ field("int_c", dbType("INTEGER"), DataTypes.INT()),
+ field("big_c", dbType("BIGINT"), DataTypes.BIGINT()),
+ field("real_c", dbType("REAL"), DataTypes.FLOAT()),
+ field("double_c", dbType("DOUBLE"), DataTypes.DOUBLE()),
+ field("numeric_c", dbType("NUMERIC(10, 5)"),
DataTypes.DECIMAL(10, 5)),
+ field("decimal_c", dbType("DECIMAL(10, 1)"),
DataTypes.DECIMAL(10, 1)),
+ field("varchar_c", dbType("VARCHAR(200)"), DataTypes.STRING()),
+ field("char_c", dbType("CHAR"), DataTypes.CHAR(1)),
+ field("character_c", dbType("CHAR(3)"), DataTypes.CHAR(3)),
+ field("timestamp_c", dbType("TIMESTAMP"),
DataTypes.TIMESTAMP(3)),
+ field("date_c", dbType("DATE"), DataTypes.DATE()),
+ field("time_c", dbType("TIME"), DataTypes.TIME(0)),
+ field("default_numeric_c", dbType("NUMERIC"),
DataTypes.DECIMAL(10, 0)),
+ field("timestamp_precision_c", dbType("TIMESTAMP(9)"),
DataTypes.TIMESTAMP(9)));
+ }
+
+ @Override
+ protected List<Row> getTestData() {
+ return Arrays.asList(
+ Row.of(
+ 1L,
+ BigDecimal.valueOf(100.1234),
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ true,
+ (short) 32767,
+ 65535,
+ 2147483647L,
+ 5.5f,
+ 6.6d,
+ BigDecimal.valueOf(123.12345),
+ BigDecimal.valueOf(404.4),
+ "Hello World",
+ "a",
+ "abc",
+ LocalDateTime.parse("2020-07-17T18:00:22.123"),
+ LocalDate.parse("2020-07-17"),
+ LocalTime.parse("18:00:22"),
+ BigDecimal.valueOf(500),
+ LocalDateTime.parse("2020-07-17T18:00:22.123456789")),
+ Row.of(
+ 2L,
+ BigDecimal.valueOf(101.1234),
+ LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+ false,
+ (short) 32767,
+ 65535,
+ 2147483647L,
+ 5.5f,
+ 6.6d,
+ BigDecimal.valueOf(123.12345),
+ BigDecimal.valueOf(404.4),
+ "Hello World",
+ "a",
+ "abc",
+ LocalDateTime.parse("2020-07-17T18:00:22.123"),
+ LocalDate.parse("2020-07-17"),
+ LocalTime.parse("18:00:22"),
+ BigDecimal.valueOf(500),
+ LocalDateTime.parse("2020-07-17T18:00:22.123456789")));
+ }
+}
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/xa/Db2ExactlyOnceSinkE2eTest.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/xa/Db2ExactlyOnceSinkE2eTest.java
new file mode 100644
index 0000000..01e0aea
--- /dev/null
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/xa/Db2ExactlyOnceSinkE2eTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.xa;
+
+import org.apache.flink.connector.jdbc.databases.db2.Db2TestBase;
+import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest;
+
+/** A simple end-to-end test for {@link JdbcExactlyOnceSinkE2eTest}. */
+public class Db2ExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest
implements Db2TestBase {}
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index d0caddf..0a0c3c8 100644
---
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -256,9 +256,6 @@ public class JdbcTableOutputFormatTest extends
JdbcDataTestBase {
try (Connection conn = getMetadata().getConnection();
Statement stat = conn.createStatement()) {
stat.execute("DELETE FROM " + OUTPUT_TABLE);
-
- stat.close();
- conn.close();
}
}
}
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/db2/Db2Database.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/db2/Db2Database.java
new file mode 100644
index 0000000..8302555
--- /dev/null
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/db2/Db2Database.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.db2;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Db2Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+
+/** Db2 database for testing. */
+public class Db2Database extends DatabaseExtension {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(Db2Database.class);
+
+ @Container
+ protected static final Db2Container CONTAINER =
+ new Db2Container()
+ .withUsername("db2inst1")
+ .withPassword("flinkpw")
+ .withEnv("AUTOCONFIG", "false")
+ .withEnv("ARCHIVE_LOGS", "true")
+ .acceptLicense()
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ private static Db2Metadata metadata;
+
+ public static Db2Metadata getMetadata() {
+ if (!CONTAINER.isRunning()) {
+ throw new FlinkRuntimeException("Container is stopped.");
+ }
+ if (metadata == null) {
+ metadata = new Db2Metadata(CONTAINER);
+ }
+ return metadata;
+ }
+
+ @Override
+ protected DatabaseMetadata startDatabase() throws Exception {
+ CONTAINER.start();
+ return getMetadata();
+ }
+
+ @Override
+ protected void stopDatabase() throws Exception {
+ CONTAINER.stop();
+ metadata = null;
+ }
+}
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/db2/Db2Metadata.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/db2/Db2Metadata.java
new file mode 100644
index 0000000..1610b84
--- /dev/null
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/db2/Db2Metadata.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.db2;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import com.ibm.db2.jcc.DB2XADataSource;
+import org.testcontainers.containers.Db2Container;
+
+import javax.sql.XADataSource;
+
+/** Db2 Metadata. */
+public class Db2Metadata implements DatabaseMetadata {
+ private final String host;
+ private final Integer port;
+ private final String dataBaseName;
+ private final String username;
+ private final String password;
+ private final String url;
+ private final String driver;
+ private final String version;
+
+ public Db2Metadata(Db2Container container) {
+ this.username = container.getUsername();
+ this.password = container.getPassword();
+ this.host = container.getHost();
+ this.port = container.getMappedPort(50000);
+ this.dataBaseName = container.getDatabaseName();
+ this.url = container.getJdbcUrl();
+ this.driver = container.getDriverClassName();
+ this.version = container.getDockerImageName();
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return this.url;
+ }
+
+ @Override
+ public String getJdbcUrlWithCredentials() {
+ return String.format(
+ "%s;username=%s;password=%s", getJdbcUrl(), getUsername(),
getPassword());
+ }
+
+ @Override
+ public String getUsername() {
+ return this.username;
+ }
+
+ @Override
+ public String getPassword() {
+ return this.password;
+ }
+
+ @Override
+ public XADataSource buildXaDataSource() {
+ DB2XADataSource xaDataSource = new DB2XADataSource();
+ xaDataSource.setDatabaseName(dataBaseName);
+ xaDataSource.setUser(getUsername());
+ xaDataSource.setPassword(getPassword());
+ xaDataSource.setServerName(host);
+ xaDataSource.setPortNumber(port);
+ xaDataSource.setDriverType(4);
+ return xaDataSource;
+ }
+
+ @Override
+ public String getDriverClass() {
+ return this.driver;
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+}