This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 38b6d6e4bb [Feature][Jdbc] Jdbc database support identifier (#5089)
38b6d6e4bb is described below

commit 38b6d6e4bbb52f993ffc9ef0109a36d6f3fa736e
Author: XiaoJiang521 <[email protected]>
AuthorDate: Tue Sep 12 14:43:46 2023 +0800

    [Feature][Jdbc] Jdbc database support identifier (#5089)
---
 docs/en/connector-v2/sink/Jdbc.md                  |   7 +
 docs/en/connector-v2/sink/Mysql.md                 |   2 +
 docs/en/connector-v2/sink/PostgreSql.md            |   2 +
 .../seatunnel/jdbc/config/JdbcOptions.java         |   7 +
 .../jdbc/internal/dialect/JdbcDialect.java         |  23 +-
 .../jdbc/internal/dialect/JdbcDialectFactory.java  |   2 +-
 .../jdbc/internal/dialect/JdbcDialectLoader.java   |   8 +-
 .../FieldIdeEnum.java}                             |  20 +-
 .../dialect/mysql/MySqlDialectFactory.java         |   7 +
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |  14 +
 .../dialect/oceanbase/OceanBaseDialectFactory.java |   2 +-
 .../internal/dialect/oracle/OracleDialect.java     |  21 +-
 .../dialect/oracle/OracleDialectFactory.java       |   7 +
 .../internal/dialect/psql/PostgresDialect.java     |  37 ++
 .../dialect/psql/PostgresDialectFactory.java       |   6 +-
 .../dialect/psqllow/PostgresLowDialect.java        |   5 +
 .../dialect/sqlserver/SqlServerDialect.java        |  32 ++
 .../dialect/sqlserver/SqlServerDialectFactory.java |   7 +
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |   6 +-
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |   6 +-
 .../dialect/PostgresDialectFactoryTest.java        |   2 +-
 .../seatunnel/jdbc/JdbcPostgresIdentifierIT.java   | 387 +++++++++++++++++++++
 .../jdbc_postgres_ide_source_and_sink.conf         |  48 +++
 23 files changed, 638 insertions(+), 20 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index 755de8bb9a..f845dea6ef 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -47,6 +47,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 | max_commit_attempts                       | Int     | No       | 3           
  |
 | transaction_timeout_sec                   | Int     | No       | -1          
  |
 | auto_commit                               | Boolean | No       | true        
  |
+| field_ide                                 | String  | No       | -           
  |
 | common-options                            |         | no       | -           
  |
 
 ### driver [string]
@@ -136,6 +137,12 @@ exactly-once semantics
 
 Automatic transaction commit is enabled by default
 
+### field_ide [String]
+
+The field "field_ide" is used to identify whether the field needs to be 
converted to uppercase or lowercase when
+synchronizing from the source to the sink. "ORIGINAL" indicates no conversion 
is needed, "UPPERCASE" indicates
+conversion to uppercase, and "LOWERCASE" indicates conversion to lowercase.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
diff --git a/docs/en/connector-v2/sink/Mysql.md 
b/docs/en/connector-v2/sink/Mysql.md
index f453a60c4e..860f071df0 100644
--- a/docs/en/connector-v2/sink/Mysql.md
+++ b/docs/en/connector-v2/sink/Mysql.md
@@ -78,6 +78,7 @@ semantics (using XA transaction guarantee).
 | max_commit_attempts                       | Int     | No       | 3       | 
The number of retries for transaction commit failures                           
                                                                                
                                                                             |
 | transaction_timeout_sec                   | Int     | No       | -1      | 
The timeout after the transaction is opened, the default is -1 (never timeout). 
Note that setting the timeout may affect<br/>exactly-once semantics             
                                                                             |
 | auto_commit                               | Boolean | No       | true    | 
Automatic transaction commit is enabled by default                              
                                                                                
                                                                             |
+| field_ide                                 | String  | No       | -       | 
Identify whether the field needs to be converted when synchronizing from the 
source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` 
indicates conversion to uppercase;`LOWERCASE` indicates conversion to 
lowercase.   |
 | common-options                            |         | no       | -       | 
Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details                                         
                                                                                
                 |
 
 ### Tips
@@ -191,6 +192,7 @@ sink {
         database = test
         table = sink_table
         primary_keys = ["id","name"]
+        field_ide = UPPERCASE
     }
 }
 ```
diff --git a/docs/en/connector-v2/sink/PostgreSql.md 
b/docs/en/connector-v2/sink/PostgreSql.md
index 67e2ed64d9..bcc5616f5e 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -81,6 +81,7 @@ semantics (using XA transaction guarantee).
 | max_commit_attempts                       | Int     | No       | 3       | 
The number of retries for transaction commit failures                           
                                                                                
                                                                             |
 | transaction_timeout_sec                   | Int     | No       | -1      | 
The timeout after the transaction is opened, the default is -1 (never timeout). 
Note that setting the timeout may affect<br/>exactly-once semantics             
                                                                             |
 | auto_commit                               | Boolean | No       | true    | 
Automatic transaction commit is enabled by default                              
                                                                                
                                                                             |
+| field_ide                                 | String  | No       | -       | 
Identify whether the field needs to be converted when synchronizing from the 
source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` 
indicates conversion to uppercase;`LOWERCASE` indicates conversion to 
lowercase.   |
 | common-options                            |         | no       | -       | 
Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details                                         
                                                                                
                 |
 
 ### Tips
@@ -197,6 +198,7 @@ sink {
         database = test
         table = sink_table
         primary_keys = ["id","name"]
+        field_ide = UPPERCASE
     }
 }
 ```
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 5d2254cd34..b01fc872f3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 
 import java.math.BigDecimal;
 import java.util.List;
@@ -154,4 +155,10 @@ public interface JdbcOptions {
                     .intType()
                     .noDefaultValue()
                     .withDescription("partition num");
+
+    Option<FieldIdeEnum> FIELD_IDE =
+            Options.key("field_ide")
+                    .enumType(FieldIdeEnum.class)
+                    .noDefaultValue()
+                    .withDescription("Whether case conversion is required");
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 8a0b31a5ee..e0cf5252a6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -20,6 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.Serializable;
 import java.sql.Connection;
@@ -68,9 +71,13 @@ public interface JdbcDialect extends Serializable {
     default String quoteIdentifier(String identifier) {
         return identifier;
     }
+    /** Quotes the identifier for database name or field name */
+    default String quoteDatabaseIdentifier(String identifier) {
+        return identifier;
+    }
 
     default String tableIdentifier(String database, String tableName) {
-        return quoteIdentifier(database) + "." + quoteIdentifier(tableName);
+        return quoteDatabaseIdentifier(database) + "." + 
quoteIdentifier(tableName);
     }
 
     /**
@@ -219,4 +226,18 @@ public interface JdbcDialect extends Serializable {
     default String extractTableName(TablePath tablePath) {
         return tablePath.getSchemaAndTableName();
     }
+
+    default String getFieldIde(String identifier, String fieldIde) {
+        if (StringUtils.isEmpty(fieldIde)) {
+            return identifier;
+        }
+        switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) {
+            case LOWERCASE:
+                return identifier.toLowerCase();
+            case UPPERCASE:
+                return identifier.toUpperCase();
+            default:
+                return identifier;
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
index 3d66de6590..5439937f53 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
@@ -44,7 +44,7 @@ public interface JdbcDialectFactory {
      * @param compatibleMode The compatible mode
      * @return a new instance of {@link JdbcDialect}
      */
-    default JdbcDialect create(String compatibleMode) {
+    default JdbcDialect create(String compatibleMode, String fieldId) {
         return create();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
index b49df35ff3..350a22e20c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -36,6 +36,10 @@ public final class JdbcDialectLoader {
 
     private JdbcDialectLoader() {}
 
+    public static JdbcDialect load(String url, String compatibleMode) {
+        return load(url, compatibleMode, "");
+    }
+
     /**
      * Loads the unique JDBC Dialect that can handle the given database url.
      *
@@ -45,7 +49,7 @@ public final class JdbcDialectLoader {
      *     unambiguously process the given database URL.
      * @return The loaded dialect.
      */
-    public static JdbcDialect load(String url, String compatibleMode) {
+    public static JdbcDialect load(String url, String compatibleMode, String 
fieldIde) {
         ClassLoader cl = Thread.currentThread().getContextClassLoader();
         List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
 
@@ -90,7 +94,7 @@ public final class JdbcDialectLoader {
                                     .collect(Collectors.joining("\n"))));
         }
 
-        return matchingFactories.get(0).create(compatibleMode);
+        return matchingFactories.get(0).create(compatibleMode, fieldIde);
     }
 
     private static List<JdbcDialectFactory> discoverFactories(ClassLoader 
classLoader) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java
similarity index 69%
copy from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
copy to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java
index e367207ffa..39f9521062 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java
@@ -15,16 +15,20 @@
  * limitations under the License.
  */
 
-package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psqllow;
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum;
 
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
+public enum FieldIdeEnum {
+    ORIGINAL("original"), // Original string form
+    UPPERCASE("uppercase"), // Convert to uppercase
+    LOWERCASE("lowercase"); // Convert to lowercase
 
-import java.util.Optional;
+    private final String value;
 
-public class PostgresLowDialect extends PostgresDialect {
-    @Override
-    public Optional<String> getUpsertStatement(
-            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
-        return Optional.empty();
+    FieldIdeEnum(String value) {
+        this.value = value;
+    }
+
+    public String getValue() {
+        return value;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
index 10047311b9..a4f89a4dc8 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
@@ -22,6 +22,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
 
 import com.google.auto.service.AutoService;
 
+import javax.annotation.Nonnull;
+
 /** Factory for {@link MysqlDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class MySqlDialectFactory implements JdbcDialectFactory {
@@ -34,4 +36,9 @@ public class MySqlDialectFactory implements 
JdbcDialectFactory {
     public JdbcDialect create() {
         return new MysqlDialect();
     }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
+        return new MysqlDialect(fieldIde);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index c71dc3f76a..1ae69a6131 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -31,6 +32,14 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class MysqlDialect implements JdbcDialect {
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public MysqlDialect() {}
+
+    public MysqlDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
     @Override
     public String dialectName() {
         return "MySQL";
@@ -48,6 +57,11 @@ public class MysqlDialect implements JdbcDialect {
 
     @Override
     public String quoteIdentifier(String identifier) {
+        return "`" + getFieldIde(identifier, fieldIde) + "`";
+    }
+
+    @Override
+    public String quoteDatabaseIdentifier(String identifier) {
         return "`" + identifier + "`";
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
index 66df84205e..b3a456870c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
@@ -40,7 +40,7 @@ public class OceanBaseDialectFactory implements 
JdbcDialectFactory {
     }
 
     @Override
-    public JdbcDialect create(@Nonnull String compatibleMode) {
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
         if ("oracle".equalsIgnoreCase(compatibleMode)) {
             return new OracleDialect();
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index 7edd935e78..e8e583dc14 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -33,6 +34,13 @@ import java.util.stream.Collectors;
 public class OracleDialect implements JdbcDialect {
 
     private static final int DEFAULT_ORACLE_FETCH_SIZE = 128;
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public OracleDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
+    public OracleDialect() {}
 
     @Override
     public String dialectName() {
@@ -56,7 +64,18 @@ public class OracleDialect implements JdbcDialect {
 
     @Override
     public String quoteIdentifier(String identifier) {
-        return identifier;
+        if (identifier.contains(".")) {
+            String[] parts = identifier.split("\\.");
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < parts.length - 1; i++) {
+                sb.append("\"").append(parts[i]).append("\"").append(".");
+            }
+            return sb.append("\"")
+                    .append(getFieldIde(parts[parts.length - 1], fieldIde))
+                    .append("\"")
+                    .toString();
+        }
+        return "\"" + getFieldIde(identifier, fieldIde) + "\"";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
index 168dc4d890..121098c461 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
@@ -22,6 +22,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
 
 import com.google.auto.service.AutoService;
 
+import javax.annotation.Nonnull;
+
 /** Factory for {@link OracleDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class OracleDialectFactory implements JdbcDialectFactory {
@@ -34,4 +36,9 @@ public class OracleDialectFactory implements 
JdbcDialectFactory {
     public JdbcDialect create() {
         return new OracleDialect();
     }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
+        return new OracleDialect(fieldIde);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index b36a28a5a6..f206589af5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -33,6 +34,14 @@ public class PostgresDialect implements JdbcDialect {
 
     public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;
 
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public PostgresDialect() {}
+
+    public PostgresDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
     @Override
     public String dialectName() {
         return "PostgreSQL";
@@ -88,4 +97,32 @@ public class PostgresDialect implements JdbcDialect {
         }
         return statement;
     }
+
+    @Override
+    public String tableIdentifier(String database, String tableName) {
+        // resolve pg database name upper or lower not recognised
+        return quoteDatabaseIdentifier(database) + "." + 
quoteIdentifier(tableName);
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        if (identifier.contains(".")) {
+            String[] parts = identifier.split("\\.");
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < parts.length - 1; i++) {
+                sb.append("\"").append(parts[i]).append("\"").append(".");
+            }
+            return sb.append("\"")
+                    .append(getFieldIde(parts[parts.length - 1], fieldIde))
+                    .append("\"")
+                    .toString();
+        }
+
+        return "\"" + getFieldIde(identifier, fieldIde) + "\"";
+    }
+
+    @Override
+    public String quoteDatabaseIdentifier(String identifier) {
+        return "\"" + identifier + "\"";
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
index 857c85290d..59dc0b45c6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
@@ -39,10 +39,10 @@ public class PostgresDialectFactory implements 
JdbcDialectFactory {
     }
 
     @Override
-    public JdbcDialect create(@Nonnull String compatibleMode) {
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
         if ("postgresLow".equalsIgnoreCase(compatibleMode)) {
-            return new PostgresLowDialect();
+            return new PostgresLowDialect(fieldIde);
         }
-        return new PostgresDialect();
+        return new PostgresDialect(fieldIde);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
index e367207ffa..9100382628 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
@@ -22,6 +22,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.Post
 import java.util.Optional;
 
 public class PostgresLowDialect extends PostgresDialect {
+
+    public PostgresLowDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
     @Override
     public Optional<String> getUpsertStatement(
             String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 2121369e22..792c03bd76 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserve
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 
 import java.util.Arrays;
 import java.util.List;
@@ -27,6 +28,15 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class SqlServerDialect implements JdbcDialect {
+
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public SqlServerDialect() {}
+
+    public SqlServerDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
     @Override
     public String dialectName() {
         return "Sqlserver";
@@ -105,4 +115,26 @@ public class SqlServerDialect implements JdbcDialect {
 
         return Optional.of(upsertSQL);
     }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        if (identifier.contains(".")) {
+            String[] parts = identifier.split("\\.");
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < parts.length - 1; i++) {
+                sb.append("[").append(parts[i]).append("]").append(".");
+            }
+            return sb.append("[")
+                    .append(getFieldIde(parts[parts.length - 1], fieldIde))
+                    .append("]")
+                    .toString();
+        }
+
+        return "[" + getFieldIde(identifier, fieldIde) + "]";
+    }
+
+    @Override
+    public String quoteDatabaseIdentifier(String identifier) {
+        return "[" + identifier + "]";
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
index d8fce3c43c..d7dae4efd5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
@@ -22,6 +22,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
 
 import com.google.auto.service.AutoService;
 
+import javax.annotation.Nonnull;
+
 /** Factory for {@link SqlServerDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class SqlServerDialectFactory implements JdbcDialectFactory {
@@ -34,4 +36,9 @@ public class SqlServerDialectFactory implements 
JdbcDialectFactory {
     public JdbcDialect create() {
         return new SqlServerDialect();
     }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
+        return new SqlServerDialect(fieldIde);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index c23619b5aa..71e0a86249 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -107,7 +108,10 @@ public class JdbcSink
         this.dialect =
                 JdbcDialectLoader.load(
                         jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
-                        
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode());
+                        
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
+                        config.get(JdbcOptions.FIELD_IDE) == null
+                                ? null
+                                : 
config.get(JdbcOptions.FIELD_IDE).getValue());
         this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 8209533f9d..d18ff0d7fd 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -30,9 +30,11 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -142,10 +144,12 @@ public class JdbcSinkFactory implements TableSinkFactory {
         }
         final ReadonlyConfig options = config;
         JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
+        FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
         JdbcDialect dialect =
                 JdbcDialectLoader.load(
                         sinkConfig.getJdbcConnectionConfig().getUrl(),
-                        
sinkConfig.getJdbcConnectionConfig().getCompatibleMode());
+                        
sinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
+                        fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
         CatalogTable finalCatalogTable = catalogTable;
         return () ->
                 new JdbcSink(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
index 79b1f11ac9..90b980a69e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
@@ -30,7 +30,7 @@ public class PostgresDialectFactoryTest {
     @Test
     public void testPostgresDialectCreate() {
         PostgresDialectFactory postgresDialectFactory = new 
PostgresDialectFactory();
-        JdbcDialect postgresLow = postgresDialectFactory.create("postgresLow");
+        JdbcDialect postgresLow = postgresDialectFactory.create("postgresLow", 
"");
         String[] fields = {"id", "name", "age"};
         String[] uniqueKeyField = {"id"};
         Optional<String> upsertStatement =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
new file mode 100644
index 0000000000..13adec7008
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support cdc")
+public class JdbcPostgresIdentifierIT extends TestSuiteBase implements 
TestResource {
+    private static final String PG_IMAGE = "postgis/postgis";
+    private static final String PG_DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";;
+    private static final String PG_JDBC_JAR =
+            
"https://repo1.maven.org/maven2/net/postgis/postgis-jdbc/2.5.1/postgis-jdbc-2.5.1.jar";;
+    private static final String PG_GEOMETRY_JAR =
+            
"https://repo1.maven.org/maven2/net/postgis/postgis-geometry/2.5.1/postgis-geometry-2.5.1.jar";;
+    private static final List<String> PG_CONFIG_FILE_LIST =
+            Lists.newArrayList("/jdbc_postgres_ide_source_and_sink.conf");
+    private PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+    private static final String PG_SOURCE_DDL =
+            "CREATE TABLE IF NOT EXISTS pg_ide_source_table (\n"
+                    + "  gid SERIAL PRIMARY KEY,\n"
+                    + "  text_col TEXT,\n"
+                    + "  varchar_col VARCHAR(255),\n"
+                    + "  char_col CHAR(10),\n"
+                    + "  boolean_col bool,\n"
+                    + "  smallint_col int2,\n"
+                    + "  integer_col int4,\n"
+                    + "  bigint_col BIGINT,\n"
+                    + "  decimal_col DECIMAL(10, 2),\n"
+                    + "  numeric_col NUMERIC(8, 4),\n"
+                    + "  real_col float4,\n"
+                    + "  double_precision_col float8,\n"
+                    + "  smallserial_col SMALLSERIAL,\n"
+                    + "  serial_col SERIAL,\n"
+                    + "  bigserial_col BIGSERIAL,\n"
+                    + "  date_col DATE,\n"
+                    + "  timestamp_col TIMESTAMP,\n"
+                    + "  bpchar_col BPCHAR(10),\n"
+                    + "  age INT NOT null,\n"
+                    + "  name VARCHAR(255) NOT null,\n"
+                    + "  point geometry(POINT, 4326),\n"
+                    + "  linestring geometry(LINESTRING, 4326),\n"
+                    + "  polygon_colums geometry(POLYGON, 4326),\n"
+                    + "  multipoint geometry(MULTIPOINT, 4326),\n"
+                    + "  multilinestring geometry(MULTILINESTRING, 4326),\n"
+                    + "  multipolygon geometry(MULTIPOLYGON, 4326),\n"
+                    + "  geometrycollection geometry(GEOMETRYCOLLECTION, 
4326),\n"
+                    + "  geog geography(POINT, 4326)\n"
+                    + ")";
+    private static final String PG_SINK_DDL =
+            "CREATE TABLE IF NOT EXISTS test.public.\"PG_IDE_SINK_TABLE\" (\n"
+                    + "    \"GID\" SERIAL PRIMARY KEY,\n"
+                    + "    \"TEXT_COL\" TEXT,\n"
+                    + "    \"VARCHAR_COL\" VARCHAR(255),\n"
+                    + "    \"CHAR_COL\" CHAR(10),\n"
+                    + "    \"BOOLEAN_COL\" bool,\n"
+                    + "    \"SMALLINT_COL\" int2,\n"
+                    + "    \"INTEGER_COL\" int4,\n"
+                    + "    \"BIGINT_COL\" BIGINT,\n"
+                    + "    \"DECIMAL_COL\" DECIMAL(10, 2),\n"
+                    + "    \"NUMERIC_COL\" NUMERIC(8, 4),\n"
+                    + "    \"REAL_COL\" float4,\n"
+                    + "    \"DOUBLE_PRECISION_COL\" float8,\n"
+                    + "    \"SMALLSERIAL_COL\" SMALLSERIAL,\n"
+                    + "    \"SERIAL_COL\" SERIAL,\n"
+                    + "    \"BIGSERIAL_COL\" BIGSERIAL,\n"
+                    + "    \"DATE_COL\" DATE,\n"
+                    + "    \"TIMESTAMP_COL\" TIMESTAMP,\n"
+                    + "    \"BPCHAR_COL\" BPCHAR(10),\n"
+                    + "    \"AGE\" int4 NOT NULL,\n"
+                    + "    \"NAME\" varchar(255) NOT NULL,\n"
+                    + "    \"POINT\" varchar(2000) NULL,\n"
+                    + "    \"LINESTRING\" varchar(2000) NULL,\n"
+                    + "    \"POLYGON_COLUMS\" varchar(2000) NULL,\n"
+                    + "    \"MULTIPOINT\" varchar(2000) NULL,\n"
+                    + "    \"MULTILINESTRING\" varchar(2000) NULL,\n"
+                    + "    \"MULTIPOLYGON\" varchar(2000) NULL,\n"
+                    + "    \"GEOMETRYCOLLECTION\" varchar(2000) NULL,\n"
+                    + "    \"GEOG\" varchar(2000) NULL\n"
+                    + "  )";
+
+    private static final String SOURCE_SQL =
+            "select \n"
+                    + "gid,\n"
+                    + "text_col,\n"
+                    + "varchar_col,\n"
+                    + "char_col,\n"
+                    + "boolean_col,\n"
+                    + "smallint_col,\n"
+                    + "integer_col,\n"
+                    + "bigint_col,\n"
+                    + "decimal_col,\n"
+                    + "numeric_col,\n"
+                    + "real_col,\n"
+                    + "double_precision_col,\n"
+                    + "smallserial_col,\n"
+                    + "serial_col,\n"
+                    + "bigserial_col,\n"
+                    + "date_col,\n"
+                    + "timestamp_col,\n"
+                    + "bpchar_col,\n"
+                    + "age,\n"
+                    + "name,\n"
+                    + "point,\n"
+                    + "linestring,\n"
+                    + "polygon_colums,\n"
+                    + "multipoint,\n"
+                    + "multilinestring,\n"
+                    + "multipolygon,\n"
+                    + "geometrycollection,\n"
+                    + "geog\n"
+                    + " from pg_ide_source_table";
+    private static final String SINK_SQL =
+            "SELECT\n"
+                    + "  \"GID\",\n"
+                    + "  \"TEXT_COL\",\n"
+                    + "  \"VARCHAR_COL\",\n"
+                    + "  \"CHAR_COL\",\n"
+                    + "  \"BOOLEAN_COL\",\n"
+                    + "  \"SMALLINT_COL\",\n"
+                    + "  \"INTEGER_COL\",\n"
+                    + "  \"BIGINT_COL\",\n"
+                    + "  \"DECIMAL_COL\",\n"
+                    + "  \"NUMERIC_COL\",\n"
+                    + "  \"REAL_COL\",\n"
+                    + "  \"DOUBLE_PRECISION_COL\",\n"
+                    + "  \"SMALLSERIAL_COL\",\n"
+                    + "  \"SERIAL_COL\",\n"
+                    + "  \"BIGSERIAL_COL\",\n"
+                    + "  \"DATE_COL\",\n"
+                    + "  \"TIMESTAMP_COL\",\n"
+                    + "  \"BPCHAR_COL\",\n"
+                    + "  \"AGE\",\n"
+                    + "  \"NAME\",\n"
+                    + "  CAST(\"POINT\" AS GEOMETRY) AS POINT,\n"
+                    + "  CAST(\"LINESTRING\" AS GEOMETRY) AS LINESTRING,\n"
+                    + "  CAST(\"POLYGON_COLUMS\" AS GEOMETRY) AS 
POLYGON_COLUMS,\n"
+                    + "  CAST(\"MULTIPOINT\" AS GEOMETRY) AS MULTIPOINT,\n"
+                    + "  CAST(\"MULTILINESTRING\" AS GEOMETRY) AS 
MULTILINESTRING,\n"
+                    + "  CAST(\"MULTIPOLYGON\" AS GEOMETRY) AS 
MULTILINESTRING,\n"
+                    + "  CAST(\"GEOMETRYCOLLECTION\" AS GEOMETRY) AS 
GEOMETRYCOLLECTION,\n"
+                    + "  CAST(\"GEOG\" AS GEOGRAPHY) AS GEOG\n"
+                    + "FROM\n"
+                    + "  \"PG_IDE_SINK_TABLE\";";
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + PG_DRIVER_JAR
+                                        + " && curl -O "
+                                        + PG_JDBC_JAR
+                                        + " && curl -O "
+                                        + PG_GEOMETRY_JAR);
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+            };
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        POSTGRESQL_CONTAINER =
+                new PostgreSQLContainer<>(
+                                DockerImageName.parse(PG_IMAGE)
+                                        .asCompatibleSubstituteFor("postgres"))
+                        .withNetwork(TestSuiteBase.NETWORK)
+                        .withNetworkAliases("postgresql")
+                        .withCommand("postgres -c 
max_prepared_transactions=100")
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+        Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+        log.info("PostgreSQL container started");
+        Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeJdbcTable);
+        log.info("pg data initialization succeeded. Procedure");
+    }
+
+    @TestTemplate
+    public void testAutoGenerateSQL(TestContainer container)
+            throws IOException, InterruptedException {
+        for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) {
+            Container.ExecResult execResult = 
container.executeJob(CONFIG_FILE);
+            Assertions.assertEquals(0, execResult.getExitCode());
+            Assertions.assertIterableEquals(querySql(SOURCE_SQL), 
querySql(SINK_SQL));
+            executeSQL("truncate table \"PG_IDE_SINK_TABLE\"");
+            log.info(CONFIG_FILE + " e2e test completed");
+        }
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = getJdbcConnection()) {
+            Statement statement = connection.createStatement();
+            statement.execute(PG_SOURCE_DDL);
+            statement.execute(PG_SINK_DDL);
+            for (int i = 1; i <= 10; i++) {
+                statement.addBatch(
+                        "INSERT INTO\n"
+                                + "  pg_ide_source_table (gid,\n"
+                                + "    text_col,\n"
+                                + "    varchar_col,\n"
+                                + "    char_col,\n"
+                                + "    boolean_col,\n"
+                                + "    smallint_col,\n"
+                                + "    integer_col,\n"
+                                + "    bigint_col,\n"
+                                + "    decimal_col,\n"
+                                + "    numeric_col,\n"
+                                + "    real_col,\n"
+                                + "    double_precision_col,\n"
+                                + "    smallserial_col,\n"
+                                + "    serial_col,\n"
+                                + "    bigserial_col,\n"
+                                + "    date_col,\n"
+                                + "    timestamp_col,\n"
+                                + "    bpchar_col,\n"
+                                + "    age,\n"
+                                + "    name,\n"
+                                + "    point,\n"
+                                + "    linestring,\n"
+                                + "    polygon_colums,\n"
+                                + "    multipoint,\n"
+                                + "    multilinestring,\n"
+                                + "    multipolygon,\n"
+                                + "    geometrycollection,\n"
+                                + "    geog\n"
+                                + "  )\n"
+                                + "VALUES\n"
+                                + "  (\n"
+                                + "    '"
+                                + i
+                                + "',\n"
+                                + "    'Hello World',\n"
+                                + "    'Test',\n"
+                                + "    'Testing',\n"
+                                + "    true,\n"
+                                + "    10,\n"
+                                + "    100,\n"
+                                + "    1000,\n"
+                                + "    10.55,\n"
+                                + "    8.8888,\n"
+                                + "    3.14,\n"
+                                + "    3.14159265,\n"
+                                + "    1,\n"
+                                + "    100,\n"
+                                + "    10000,\n"
+                                + "    '2023-05-07',\n"
+                                + "    '2023-05-07 14:30:00',\n"
+                                + "    'Testing',\n"
+                                + "    21,\n"
+                                + "    'Leblanc',\n"
+                                + "    ST_GeomFromText('POINT(-122.3452 
47.5925)', 4326),\n"
+                                + "    ST_GeomFromText(\n"
+                                + "      'LINESTRING(-122.3451 47.5924, 
-122.3449 47.5923)',\n"
+                                + "      4326\n"
+                                + "    ),\n"
+                                + "    ST_GeomFromText(\n"
+                                + "      'POLYGON((-122.3453 47.5922, 
-122.3453 47.5926, -122.3448 47.5926, -122.3448 47.5922, -122.3453 
47.5922))',\n"
+                                + "      4326\n"
+                                + "    ),\n"
+                                + "    ST_GeomFromText(\n"
+                                + "      'MULTIPOINT(-122.3459 47.5927, 
-122.3445 47.5918)',\n"
+                                + "      4326\n"
+                                + "    ),\n"
+                                + "    ST_GeomFromText(\n"
+                                + "      'MULTILINESTRING((-122.3463 47.5920, 
-122.3461 47.5919),(-122.3459 47.5924, -122.3457 47.5923))',\n"
+                                + "      4326\n"
+                                + "    ),\n"
+                                + "    ST_GeomFromText(\n"
+                                + "      'MULTIPOLYGON(((-122.3458 47.5925, 
-122.3458 47.5928, -122.3454 47.5928, -122.3454 47.5925, -122.3458 
47.5925)),((-122.3453 47.5921, -122.3453 47.5924, -122.3448 47.5924, -122.3448 
47.5921, -122.3453 47.5921)))',\n"
+                                + "      4326\n"
+                                + "    ),\n"
+                                + "    ST_GeomFromText(\n"
+                                + "      'GEOMETRYCOLLECTION(POINT(-122.3462 
47.5921), LINESTRING(-122.3460 47.5924, -122.3457 47.5924))',\n"
+                                + "      4326\n"
+                                + "    ),\n"
+                                + "    ST_GeographyFromText('POINT(-122.3452 
47.5925)')\n"
+                                + "  )");
+            }
+
+            statement.executeBatch();
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table 
failed!", e);
+        }
+    }
+
+    private Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                POSTGRESQL_CONTAINER.getJdbcUrl(),
+                POSTGRESQL_CONTAINER.getUsername(),
+                POSTGRESQL_CONTAINER.getPassword());
+    }
+
+    private List<List<Object>> querySql(String sql) {
+        try (Connection connection = getJdbcConnection()) {
+            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+            List<List<Object>> result = new ArrayList<>();
+            int columnCount = resultSet.getMetaData().getColumnCount();
+            while (resultSet.next()) {
+                ArrayList<Object> objects = new ArrayList<>();
+                for (int i = 1; i <= columnCount; i++) {
+                    objects.add(resultSet.getObject(i));
+                }
+                result.add(objects);
+            }
+            return result;
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void executeSQL(String sql) {
+        try (Connection connection = getJdbcConnection()) {
+            Statement statement = connection.createStatement();
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (POSTGRESQL_CONTAINER != null) {
+            POSTGRESQL_CONTAINER.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf
new file mode 100644
index 0000000000..52f9c06570
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query ="""select gid, text_col, varchar_col, char_col, boolean_col, 
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, 
double_precision_col,
+                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, 
multipoint,
+                         multilinestring, multipolygon, geometrycollection, 
geog from pg_ide_source_table"""
+    }
+}
+
+
+sink {
+  Jdbc {
+    driver = org.postgresql.Driver
+    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+    user = test
+    password = test
+    generate_sink_sql = true
+    field_ide = UPPERCASE
+    database = test
+    table = "public.PG_IDE_SINK_TABLE"
+    primary_keys = ["gid"]
+  }
+}
\ No newline at end of file

Reply via email to