This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 38b6d6e4bb [Feature][Jdbc] Jdbc database support identifier (#5089)
38b6d6e4bb is described below
commit 38b6d6e4bbb52f993ffc9ef0109a36d6f3fa736e
Author: XiaoJiang521 <[email protected]>
AuthorDate: Tue Sep 12 14:43:46 2023 +0800
[Feature][Jdbc] Jdbc database support identifier (#5089)
---
docs/en/connector-v2/sink/Jdbc.md | 7 +
docs/en/connector-v2/sink/Mysql.md | 2 +
docs/en/connector-v2/sink/PostgreSql.md | 2 +
.../seatunnel/jdbc/config/JdbcOptions.java | 7 +
.../jdbc/internal/dialect/JdbcDialect.java | 23 +-
.../jdbc/internal/dialect/JdbcDialectFactory.java | 2 +-
.../jdbc/internal/dialect/JdbcDialectLoader.java | 8 +-
.../FieldIdeEnum.java} | 20 +-
.../dialect/mysql/MySqlDialectFactory.java | 7 +
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 14 +
.../dialect/oceanbase/OceanBaseDialectFactory.java | 2 +-
.../internal/dialect/oracle/OracleDialect.java | 21 +-
.../dialect/oracle/OracleDialectFactory.java | 7 +
.../internal/dialect/psql/PostgresDialect.java | 37 ++
.../dialect/psql/PostgresDialectFactory.java | 6 +-
.../dialect/psqllow/PostgresLowDialect.java | 5 +
.../dialect/sqlserver/SqlServerDialect.java | 32 ++
.../dialect/sqlserver/SqlServerDialectFactory.java | 7 +
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 6 +-
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 6 +-
.../dialect/PostgresDialectFactoryTest.java | 2 +-
.../seatunnel/jdbc/JdbcPostgresIdentifierIT.java | 387 +++++++++++++++++++++
.../jdbc_postgres_ide_source_and_sink.conf | 48 +++
23 files changed, 638 insertions(+), 20 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index 755de8bb9a..f845dea6ef 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -47,6 +47,7 @@ support `Xa transactions`. You can set `is_exactly_once=true`
to enable it.
| max_commit_attempts | Int | No | 3
|
| transaction_timeout_sec | Int | No | -1
|
| auto_commit | Boolean | No | true
|
+| field_ide | String | No | -
|
| common-options | | no | -
|
### driver [string]
@@ -136,6 +137,12 @@ exactly-once semantics
Automatic transaction commit is enabled by default
+### field_ide [String]
+
+The field "field_ide" is used to identify whether the field needs to be
converted to uppercase or lowercase when
+synchronizing from the source to the sink. "ORIGINAL" indicates no conversion
is needed, "UPPERCASE" indicates
+conversion to uppercase, and "LOWERCASE" indicates conversion to lowercase.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
diff --git a/docs/en/connector-v2/sink/Mysql.md
b/docs/en/connector-v2/sink/Mysql.md
index f453a60c4e..860f071df0 100644
--- a/docs/en/connector-v2/sink/Mysql.md
+++ b/docs/en/connector-v2/sink/Mysql.md
@@ -78,6 +78,7 @@ semantics (using XA transaction guarantee).
| max_commit_attempts | Int | No | 3 |
The number of retries for transaction commit failures
|
| transaction_timeout_sec | Int | No | -1 |
The timeout after the transaction is opened, the default is -1 (never timeout).
Note that setting the timeout may affect<br/>exactly-once semantics
|
| auto_commit | Boolean | No | true |
Automatic transaction commit is enabled by default
|
+| field_ide | String | No | - |
Identify whether the field needs to be converted when synchronizing from the
source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE`
indicates conversion to uppercase;`LOWERCASE` indicates conversion to
lowercase. |
| common-options | | no | - |
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
|
### Tips
@@ -191,6 +192,7 @@ sink {
database = test
table = sink_table
primary_keys = ["id","name"]
+ field_ide = UPPERCASE
}
}
```
diff --git a/docs/en/connector-v2/sink/PostgreSql.md
b/docs/en/connector-v2/sink/PostgreSql.md
index 67e2ed64d9..bcc5616f5e 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -81,6 +81,7 @@ semantics (using XA transaction guarantee).
| max_commit_attempts | Int | No | 3 |
The number of retries for transaction commit failures
|
| transaction_timeout_sec | Int | No | -1 |
The timeout after the transaction is opened, the default is -1 (never timeout).
Note that setting the timeout may affect<br/>exactly-once semantics
|
| auto_commit | Boolean | No | true |
Automatic transaction commit is enabled by default
|
+| field_ide | String | No | - |
Identify whether the field needs to be converted when synchronizing from the
source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE`
indicates conversion to uppercase;`LOWERCASE` indicates conversion to
lowercase. |
| common-options | | no | - |
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
|
### Tips
@@ -197,6 +198,7 @@ sink {
database = test
table = sink_table
primary_keys = ["id","name"]
+ field_ide = UPPERCASE
}
}
```
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 5d2254cd34..b01fc872f3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import java.math.BigDecimal;
import java.util.List;
@@ -154,4 +155,10 @@ public interface JdbcOptions {
.intType()
.noDefaultValue()
.withDescription("partition num");
+
+ Option<FieldIdeEnum> FIELD_IDE =
+ Options.key("field_ide")
+ .enumType(FieldIdeEnum.class)
+ .noDefaultValue()
+ .withDescription("Whether case conversion is required");
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 8a0b31a5ee..e0cf5252a6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -20,6 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+
+import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.sql.Connection;
@@ -68,9 +71,13 @@ public interface JdbcDialect extends Serializable {
default String quoteIdentifier(String identifier) {
return identifier;
}
+ /** Quotes the identifier for database name or field name */
+ default String quoteDatabaseIdentifier(String identifier) {
+ return identifier;
+ }
default String tableIdentifier(String database, String tableName) {
- return quoteIdentifier(database) + "." + quoteIdentifier(tableName);
+ return quoteDatabaseIdentifier(database) + "." +
quoteIdentifier(tableName);
}
/**
@@ -219,4 +226,18 @@ public interface JdbcDialect extends Serializable {
default String extractTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}
+
+ default String getFieldIde(String identifier, String fieldIde) {
+ if (StringUtils.isEmpty(fieldIde)) {
+ return identifier;
+ }
+ switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) {
+ case LOWERCASE:
+ return identifier.toLowerCase();
+ case UPPERCASE:
+ return identifier.toUpperCase();
+ default:
+ return identifier;
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
index 3d66de6590..5439937f53 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
@@ -44,7 +44,7 @@ public interface JdbcDialectFactory {
* @param compatibleMode The compatible mode
* @return a new instance of {@link JdbcDialect}
*/
- default JdbcDialect create(String compatibleMode) {
+ default JdbcDialect create(String compatibleMode, String fieldId) {
return create();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
index b49df35ff3..350a22e20c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -36,6 +36,10 @@ public final class JdbcDialectLoader {
private JdbcDialectLoader() {}
+ public static JdbcDialect load(String url, String compatibleMode) {
+ return load(url, compatibleMode, "");
+ }
+
/**
* Loads the unique JDBC Dialect that can handle the given database url.
*
@@ -45,7 +49,7 @@ public final class JdbcDialectLoader {
* unambiguously process the given database URL.
* @return The loaded dialect.
*/
- public static JdbcDialect load(String url, String compatibleMode) {
+ public static JdbcDialect load(String url, String compatibleMode, String
fieldIde) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
@@ -90,7 +94,7 @@ public final class JdbcDialectLoader {
.collect(Collectors.joining("\n"))));
}
- return matchingFactories.get(0).create(compatibleMode);
+ return matchingFactories.get(0).create(compatibleMode, fieldIde);
}
private static List<JdbcDialectFactory> discoverFactories(ClassLoader
classLoader) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java
similarity index 69%
copy from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
copy to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java
index e367207ffa..39f9521062 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java
@@ -15,16 +15,20 @@
* limitations under the License.
*/
-package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psqllow;
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
+public enum FieldIdeEnum {
+ ORIGINAL("original"), // Original string form
+ UPPERCASE("uppercase"), // Convert to uppercase
+ LOWERCASE("lowercase"); // Convert to lowercase
-import java.util.Optional;
+ private final String value;
-public class PostgresLowDialect extends PostgresDialect {
- @Override
- public Optional<String> getUpsertStatement(
- String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
- return Optional.empty();
+ FieldIdeEnum(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
index 10047311b9..a4f89a4dc8 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
@@ -22,6 +22,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
import com.google.auto.service.AutoService;
+import javax.annotation.Nonnull;
+
/** Factory for {@link MysqlDialect}. */
@AutoService(JdbcDialectFactory.class)
public class MySqlDialectFactory implements JdbcDialectFactory {
@@ -34,4 +36,9 @@ public class MySqlDialectFactory implements
JdbcDialectFactory {
public JdbcDialect create() {
return new MysqlDialect();
}
+
+ @Override
+ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
+ return new MysqlDialect(fieldIde);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index c71dc3f76a..1ae69a6131 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -31,6 +32,14 @@ import java.util.Optional;
import java.util.stream.Collectors;
public class MysqlDialect implements JdbcDialect {
+ public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+ public MysqlDialect() {}
+
+ public MysqlDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
@Override
public String dialectName() {
return "MySQL";
@@ -48,6 +57,11 @@ public class MysqlDialect implements JdbcDialect {
@Override
public String quoteIdentifier(String identifier) {
+ return "`" + getFieldIde(identifier, fieldIde) + "`";
+ }
+
+ @Override
+ public String quoteDatabaseIdentifier(String identifier) {
return "`" + identifier + "`";
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
index 66df84205e..b3a456870c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
@@ -40,7 +40,7 @@ public class OceanBaseDialectFactory implements
JdbcDialectFactory {
}
@Override
- public JdbcDialect create(@Nonnull String compatibleMode) {
+ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
if ("oracle".equalsIgnoreCase(compatibleMode)) {
return new OracleDialect();
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index 7edd935e78..e8e583dc14 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -33,6 +34,13 @@ import java.util.stream.Collectors;
public class OracleDialect implements JdbcDialect {
private static final int DEFAULT_ORACLE_FETCH_SIZE = 128;
+ public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+ public OracleDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
+ public OracleDialect() {}
@Override
public String dialectName() {
@@ -56,7 +64,18 @@ public class OracleDialect implements JdbcDialect {
@Override
public String quoteIdentifier(String identifier) {
- return identifier;
+ if (identifier.contains(".")) {
+ String[] parts = identifier.split("\\.");
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < parts.length - 1; i++) {
+ sb.append("\"").append(parts[i]).append("\"").append(".");
+ }
+ return sb.append("\"")
+ .append(getFieldIde(parts[parts.length - 1], fieldIde))
+ .append("\"")
+ .toString();
+ }
+ return "\"" + getFieldIde(identifier, fieldIde) + "\"";
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
index 168dc4d890..121098c461 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
@@ -22,6 +22,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
import com.google.auto.service.AutoService;
+import javax.annotation.Nonnull;
+
/** Factory for {@link OracleDialect}. */
@AutoService(JdbcDialectFactory.class)
public class OracleDialectFactory implements JdbcDialectFactory {
@@ -34,4 +36,9 @@ public class OracleDialectFactory implements
JdbcDialectFactory {
public JdbcDialect create() {
return new OracleDialect();
}
+
+ @Override
+ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
+ return new OracleDialect(fieldIde);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index b36a28a5a6..f206589af5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -33,6 +34,14 @@ public class PostgresDialect implements JdbcDialect {
public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;
+ public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+ public PostgresDialect() {}
+
+ public PostgresDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
@Override
public String dialectName() {
return "PostgreSQL";
@@ -88,4 +97,32 @@ public class PostgresDialect implements JdbcDialect {
}
return statement;
}
+
+ @Override
+ public String tableIdentifier(String database, String tableName) {
+ // resolve pg database name upper or lower not recognised
+ return quoteDatabaseIdentifier(database) + "." +
quoteIdentifier(tableName);
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ if (identifier.contains(".")) {
+ String[] parts = identifier.split("\\.");
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < parts.length - 1; i++) {
+ sb.append("\"").append(parts[i]).append("\"").append(".");
+ }
+ return sb.append("\"")
+ .append(getFieldIde(parts[parts.length - 1], fieldIde))
+ .append("\"")
+ .toString();
+ }
+
+ return "\"" + getFieldIde(identifier, fieldIde) + "\"";
+ }
+
+ @Override
+ public String quoteDatabaseIdentifier(String identifier) {
+ return "\"" + identifier + "\"";
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
index 857c85290d..59dc0b45c6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
@@ -39,10 +39,10 @@ public class PostgresDialectFactory implements
JdbcDialectFactory {
}
@Override
- public JdbcDialect create(@Nonnull String compatibleMode) {
+ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
if ("postgresLow".equalsIgnoreCase(compatibleMode)) {
- return new PostgresLowDialect();
+ return new PostgresLowDialect(fieldIde);
}
- return new PostgresDialect();
+ return new PostgresDialect(fieldIde);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
index e367207ffa..9100382628 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
@@ -22,6 +22,11 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.Post
import java.util.Optional;
public class PostgresLowDialect extends PostgresDialect {
+
+ public PostgresLowDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 2121369e22..792c03bd76 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserve
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import java.util.Arrays;
import java.util.List;
@@ -27,6 +28,15 @@ import java.util.Optional;
import java.util.stream.Collectors;
public class SqlServerDialect implements JdbcDialect {
+
+ public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+ public SqlServerDialect() {}
+
+ public SqlServerDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
@Override
public String dialectName() {
return "Sqlserver";
@@ -105,4 +115,26 @@ public class SqlServerDialect implements JdbcDialect {
return Optional.of(upsertSQL);
}
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ if (identifier.contains(".")) {
+ String[] parts = identifier.split("\\.");
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < parts.length - 1; i++) {
+ sb.append("[").append(parts[i]).append("]").append(".");
+ }
+ return sb.append("[")
+ .append(getFieldIde(parts[parts.length - 1], fieldIde))
+ .append("]")
+ .toString();
+ }
+
+ return "[" + getFieldIde(identifier, fieldIde) + "]";
+ }
+
+ @Override
+ public String quoteDatabaseIdentifier(String identifier) {
+ return "[" + identifier + "]";
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
index d8fce3c43c..d7dae4efd5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
@@ -22,6 +22,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
import com.google.auto.service.AutoService;
+import javax.annotation.Nonnull;
+
/** Factory for {@link SqlServerDialect}. */
@AutoService(JdbcDialectFactory.class)
public class SqlServerDialectFactory implements JdbcDialectFactory {
@@ -34,4 +36,9 @@ public class SqlServerDialectFactory implements
JdbcDialectFactory {
public JdbcDialect create() {
return new SqlServerDialect();
}
+
+ @Override
+ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
+ return new SqlServerDialect(fieldIde);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index c23619b5aa..71e0a86249 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -107,7 +108,10 @@ public class JdbcSink
this.dialect =
JdbcDialectLoader.load(
jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
-
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode());
+
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
+ config.get(JdbcOptions.FIELD_IDE) == null
+ ? null
+ :
config.get(JdbcOptions.FIELD_IDE).getValue());
this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 8209533f9d..d18ff0d7fd 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -30,9 +30,11 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -142,10 +144,12 @@ public class JdbcSinkFactory implements TableSinkFactory {
}
final ReadonlyConfig options = config;
JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
+ FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
JdbcDialect dialect =
JdbcDialectLoader.load(
sinkConfig.getJdbcConnectionConfig().getUrl(),
-
sinkConfig.getJdbcConnectionConfig().getCompatibleMode());
+
sinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
+ fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
CatalogTable finalCatalogTable = catalogTable;
return () ->
new JdbcSink(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
index 79b1f11ac9..90b980a69e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
@@ -30,7 +30,7 @@ public class PostgresDialectFactoryTest {
@Test
public void testPostgresDialectCreate() {
PostgresDialectFactory postgresDialectFactory = new
PostgresDialectFactory();
- JdbcDialect postgresLow = postgresDialectFactory.create("postgresLow");
+ JdbcDialect postgresLow = postgresDialectFactory.create("postgresLow",
"");
String[] fields = {"id", "name", "age"};
String[] uniqueKeyField = {"id"};
Optional<String> upsertStatement =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
new file mode 100644
index 0000000000..13adec7008
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class JdbcPostgresIdentifierIT extends TestSuiteBase implements
TestResource {
+ private static final String PG_IMAGE = "postgis/postgis";
+ private static final String PG_DRIVER_JAR =
+
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+ private static final String PG_JDBC_JAR =
+
"https://repo1.maven.org/maven2/net/postgis/postgis-jdbc/2.5.1/postgis-jdbc-2.5.1.jar";
+ private static final String PG_GEOMETRY_JAR =
+
"https://repo1.maven.org/maven2/net/postgis/postgis-geometry/2.5.1/postgis-geometry-2.5.1.jar";
+ private static final List<String> PG_CONFIG_FILE_LIST =
+ Lists.newArrayList("/jdbc_postgres_ide_source_and_sink.conf");
+ private PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+ private static final String PG_SOURCE_DDL =
+ "CREATE TABLE IF NOT EXISTS pg_ide_source_table (\n"
+ + " gid SERIAL PRIMARY KEY,\n"
+ + " text_col TEXT,\n"
+ + " varchar_col VARCHAR(255),\n"
+ + " char_col CHAR(10),\n"
+ + " boolean_col bool,\n"
+ + " smallint_col int2,\n"
+ + " integer_col int4,\n"
+ + " bigint_col BIGINT,\n"
+ + " decimal_col DECIMAL(10, 2),\n"
+ + " numeric_col NUMERIC(8, 4),\n"
+ + " real_col float4,\n"
+ + " double_precision_col float8,\n"
+ + " smallserial_col SMALLSERIAL,\n"
+ + " serial_col SERIAL,\n"
+ + " bigserial_col BIGSERIAL,\n"
+ + " date_col DATE,\n"
+ + " timestamp_col TIMESTAMP,\n"
+ + " bpchar_col BPCHAR(10),\n"
+ + " age INT NOT null,\n"
+ + " name VARCHAR(255) NOT null,\n"
+ + " point geometry(POINT, 4326),\n"
+ + " linestring geometry(LINESTRING, 4326),\n"
+ + " polygon_colums geometry(POLYGON, 4326),\n"
+ + " multipoint geometry(MULTIPOINT, 4326),\n"
+ + " multilinestring geometry(MULTILINESTRING, 4326),\n"
+ + " multipolygon geometry(MULTIPOLYGON, 4326),\n"
+ + " geometrycollection geometry(GEOMETRYCOLLECTION,
4326),\n"
+ + " geog geography(POINT, 4326)\n"
+ + ")";
+ private static final String PG_SINK_DDL =
+ "CREATE TABLE IF NOT EXISTS test.public.\"PG_IDE_SINK_TABLE\" (\n"
+ + " \"GID\" SERIAL PRIMARY KEY,\n"
+ + " \"TEXT_COL\" TEXT,\n"
+ + " \"VARCHAR_COL\" VARCHAR(255),\n"
+ + " \"CHAR_COL\" CHAR(10),\n"
+ + " \"BOOLEAN_COL\" bool,\n"
+ + " \"SMALLINT_COL\" int2,\n"
+ + " \"INTEGER_COL\" int4,\n"
+ + " \"BIGINT_COL\" BIGINT,\n"
+ + " \"DECIMAL_COL\" DECIMAL(10, 2),\n"
+ + " \"NUMERIC_COL\" NUMERIC(8, 4),\n"
+ + " \"REAL_COL\" float4,\n"
+ + " \"DOUBLE_PRECISION_COL\" float8,\n"
+ + " \"SMALLSERIAL_COL\" SMALLSERIAL,\n"
+ + " \"SERIAL_COL\" SERIAL,\n"
+ + " \"BIGSERIAL_COL\" BIGSERIAL,\n"
+ + " \"DATE_COL\" DATE,\n"
+ + " \"TIMESTAMP_COL\" TIMESTAMP,\n"
+ + " \"BPCHAR_COL\" BPCHAR(10),\n"
+ + " \"AGE\" int4 NOT NULL,\n"
+ + " \"NAME\" varchar(255) NOT NULL,\n"
+ + " \"POINT\" varchar(2000) NULL,\n"
+ + " \"LINESTRING\" varchar(2000) NULL,\n"
+ + " \"POLYGON_COLUMS\" varchar(2000) NULL,\n"
+ + " \"MULTIPOINT\" varchar(2000) NULL,\n"
+ + " \"MULTILINESTRING\" varchar(2000) NULL,\n"
+ + " \"MULTIPOLYGON\" varchar(2000) NULL,\n"
+ + " \"GEOMETRYCOLLECTION\" varchar(2000) NULL,\n"
+ + " \"GEOG\" varchar(2000) NULL\n"
+ + " )";
+
+ private static final String SOURCE_SQL =
+ "select \n"
+ + "gid,\n"
+ + "text_col,\n"
+ + "varchar_col,\n"
+ + "char_col,\n"
+ + "boolean_col,\n"
+ + "smallint_col,\n"
+ + "integer_col,\n"
+ + "bigint_col,\n"
+ + "decimal_col,\n"
+ + "numeric_col,\n"
+ + "real_col,\n"
+ + "double_precision_col,\n"
+ + "smallserial_col,\n"
+ + "serial_col,\n"
+ + "bigserial_col,\n"
+ + "date_col,\n"
+ + "timestamp_col,\n"
+ + "bpchar_col,\n"
+ + "age,\n"
+ + "name,\n"
+ + "point,\n"
+ + "linestring,\n"
+ + "polygon_colums,\n"
+ + "multipoint,\n"
+ + "multilinestring,\n"
+ + "multipolygon,\n"
+ + "geometrycollection,\n"
+ + "geog\n"
+ + " from pg_ide_source_table";
+ private static final String SINK_SQL =
+ "SELECT\n"
+ + " \"GID\",\n"
+ + " \"TEXT_COL\",\n"
+ + " \"VARCHAR_COL\",\n"
+ + " \"CHAR_COL\",\n"
+ + " \"BOOLEAN_COL\",\n"
+ + " \"SMALLINT_COL\",\n"
+ + " \"INTEGER_COL\",\n"
+ + " \"BIGINT_COL\",\n"
+ + " \"DECIMAL_COL\",\n"
+ + " \"NUMERIC_COL\",\n"
+ + " \"REAL_COL\",\n"
+ + " \"DOUBLE_PRECISION_COL\",\n"
+ + " \"SMALLSERIAL_COL\",\n"
+ + " \"SERIAL_COL\",\n"
+ + " \"BIGSERIAL_COL\",\n"
+ + " \"DATE_COL\",\n"
+ + " \"TIMESTAMP_COL\",\n"
+ + " \"BPCHAR_COL\",\n"
+ + " \"AGE\",\n"
+ + " \"NAME\",\n"
+ + " CAST(\"POINT\" AS GEOMETRY) AS POINT,\n"
+ + " CAST(\"LINESTRING\" AS GEOMETRY) AS LINESTRING,\n"
+ + " CAST(\"POLYGON_COLUMS\" AS GEOMETRY) AS
POLYGON_COLUMS,\n"
+ + " CAST(\"MULTIPOINT\" AS GEOMETRY) AS MULTIPOINT,\n"
+ + " CAST(\"MULTILINESTRING\" AS GEOMETRY) AS
MULTILINESTRING,\n"
+ + " CAST(\"MULTIPOLYGON\" AS GEOMETRY) AS
MULTILINESTRING,\n"
+ + " CAST(\"GEOMETRYCOLLECTION\" AS GEOMETRY) AS
GEOMETRYCOLLECTION,\n"
+ + " CAST(\"GEOG\" AS GEOGRAPHY) AS GEOG\n"
+ + "FROM\n"
+ + " \"PG_IDE_SINK_TABLE\";";
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + PG_DRIVER_JAR
+ + " && curl -O "
+ + PG_JDBC_JAR
+ + " && curl -O "
+ + PG_GEOMETRY_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ POSTGRESQL_CONTAINER =
+ new PostgreSQLContainer<>(
+ DockerImageName.parse(PG_IMAGE)
+ .asCompatibleSubstituteFor("postgres"))
+ .withNetwork(TestSuiteBase.NETWORK)
+ .withNetworkAliases("postgresql")
+ .withCommand("postgres -c
max_prepared_transactions=100")
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+ Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+ log.info("PostgreSQL container started");
+ Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeJdbcTable);
+ log.info("pg data initialization succeeded. Procedure");
+ }
+
+ @TestTemplate
+ public void testAutoGenerateSQL(TestContainer container)
+ throws IOException, InterruptedException {
+ for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) {
+ Container.ExecResult execResult =
container.executeJob(CONFIG_FILE);
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertIterableEquals(querySql(SOURCE_SQL),
querySql(SINK_SQL));
+ executeSQL("truncate table \"PG_IDE_SINK_TABLE\"");
+ log.info(CONFIG_FILE + " e2e test completed");
+ }
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = getJdbcConnection()) {
+ Statement statement = connection.createStatement();
+ statement.execute(PG_SOURCE_DDL);
+ statement.execute(PG_SINK_DDL);
+ for (int i = 1; i <= 10; i++) {
+ statement.addBatch(
+ "INSERT INTO\n"
+ + " pg_ide_source_table (gid,\n"
+ + " text_col,\n"
+ + " varchar_col,\n"
+ + " char_col,\n"
+ + " boolean_col,\n"
+ + " smallint_col,\n"
+ + " integer_col,\n"
+ + " bigint_col,\n"
+ + " decimal_col,\n"
+ + " numeric_col,\n"
+ + " real_col,\n"
+ + " double_precision_col,\n"
+ + " smallserial_col,\n"
+ + " serial_col,\n"
+ + " bigserial_col,\n"
+ + " date_col,\n"
+ + " timestamp_col,\n"
+ + " bpchar_col,\n"
+ + " age,\n"
+ + " name,\n"
+ + " point,\n"
+ + " linestring,\n"
+ + " polygon_colums,\n"
+ + " multipoint,\n"
+ + " multilinestring,\n"
+ + " multipolygon,\n"
+ + " geometrycollection,\n"
+ + " geog\n"
+ + " )\n"
+ + "VALUES\n"
+ + " (\n"
+ + " '"
+ + i
+ + "',\n"
+ + " 'Hello World',\n"
+ + " 'Test',\n"
+ + " 'Testing',\n"
+ + " true,\n"
+ + " 10,\n"
+ + " 100,\n"
+ + " 1000,\n"
+ + " 10.55,\n"
+ + " 8.8888,\n"
+ + " 3.14,\n"
+ + " 3.14159265,\n"
+ + " 1,\n"
+ + " 100,\n"
+ + " 10000,\n"
+ + " '2023-05-07',\n"
+ + " '2023-05-07 14:30:00',\n"
+ + " 'Testing',\n"
+ + " 21,\n"
+ + " 'Leblanc',\n"
+ + " ST_GeomFromText('POINT(-122.3452
47.5925)', 4326),\n"
+ + " ST_GeomFromText(\n"
+ + " 'LINESTRING(-122.3451 47.5924,
-122.3449 47.5923)',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'POLYGON((-122.3453 47.5922,
-122.3453 47.5926, -122.3448 47.5926, -122.3448 47.5922, -122.3453
47.5922))',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'MULTIPOINT(-122.3459 47.5927,
-122.3445 47.5918)',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'MULTILINESTRING((-122.3463 47.5920,
-122.3461 47.5919),(-122.3459 47.5924, -122.3457 47.5923))',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'MULTIPOLYGON(((-122.3458 47.5925,
-122.3458 47.5928, -122.3454 47.5928, -122.3454 47.5925, -122.3458
47.5925)),((-122.3453 47.5921, -122.3453 47.5924, -122.3448 47.5924, -122.3448
47.5921, -122.3453 47.5921)))',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'GEOMETRYCOLLECTION(POINT(-122.3462
47.5921), LINESTRING(-122.3460 47.5924, -122.3457 47.5924))',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeographyFromText('POINT(-122.3452
47.5925)')\n"
+ + " )");
+ }
+
+ statement.executeBatch();
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table
failed!", e);
+ }
+ }
+
+ private Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ POSTGRESQL_CONTAINER.getJdbcUrl(),
+ POSTGRESQL_CONTAINER.getUsername(),
+ POSTGRESQL_CONTAINER.getPassword());
+ }
+
+ private List<List<Object>> querySql(String sql) {
+ try (Connection connection = getJdbcConnection()) {
+ ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ List<List<Object>> result = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ ArrayList<Object> objects = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ objects.add(resultSet.getObject(i));
+ }
+ result.add(objects);
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void executeSQL(String sql) {
+ try (Connection connection = getJdbcConnection()) {
+ Statement statement = connection.createStatement();
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (POSTGRESQL_CONTAINER != null) {
+ POSTGRESQL_CONTAINER.stop();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf
new file mode 100644
index 0000000000..52f9c06570
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
+ smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
+ multilinestring, multipolygon, geometrycollection,
geog from pg_ide_source_table"""
+ }
+}
+
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+ generate_sink_sql = true
+ field_ide = UPPERCASE
+ database = test
+ table = "public.PG_IDE_SINK_TABLE"
+ primary_keys = ["gid"]
+ }
+}
\ No newline at end of file