This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ca4a65fc00 feat: jdbc support copy in statement. (#6443)
ca4a65fc00 is described below

commit ca4a65fc00e9d7ada2c9840b250dae8204880d2d
Author: MoSence <[email protected]>
AuthorDate: Mon Mar 11 18:04:07 2024 +0800

    feat: jdbc support copy in statement. (#6443)
---
 docs/en/connector-v2/sink/Jdbc.md                  |   7 +
 pom.xml                                            |   6 +
 seatunnel-common/pom.xml                           |   5 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |   1 -
 .../seatunnel/jdbc/config/JdbcOptions.java         |   6 +
 .../seatunnel/jdbc/config/JdbcSinkConfig.java      |   2 +
 .../jdbc/exception/JdbcConnectorErrorCode.java     |   3 +-
 .../jdbc/internal/JdbcOutputFormatBuilder.java     |  25 ++-
 .../CopyManagerBatchStatementExecutor.java         | 195 +++++++++++++++++++++
 .../jdbc/internal/executor/CopyManagerProxy.java   |  92 ++++++++++
 .../connectors/seatunnel/jdbc/JdbcPostgresIT.java  |  18 +-
 .../jdbc_postgres_source_and_sink_copy_stmt.conf   |  50 ++++++
 tools/dependencies/known-dependencies.txt          |   1 +
 13 files changed, 403 insertions(+), 8 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index bfe49277ea..f0b74414a4 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -56,6 +56,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 | data_save_mode                            | Enum    | No       | APPEND_DATA 
                 |
 | custom_sql                                | String  | No       | -           
                 |
 | enable_upsert                             | Boolean | No       | true        
                 |
+| use_copy_statement                        | Boolean | No       | false       
                 |
 
 ### driver [string]
 
@@ -197,6 +198,12 @@ When data_save_mode selects CUSTOM_PROCESSING, you should 
fill in the CUSTOM_SQL
 
 Enable upsert by primary_keys exist, If the task has no key duplicate data, 
setting this parameter to `false` can speed up data import
 
+### use_copy_statement [boolean]
+
+Use `COPY ${table} FROM STDIN` statement to import data. Only drivers with 
`getCopyAPI()` method connections are supported.  e.g.: Postgresql driver 
`org.postgresql.Driver`.
+
+NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.
+
 ## tips
 
 In the case of is_exactly_once = "true", Xa transactions are used. This 
requires database support, and some databases require some setup :
diff --git a/pom.xml b/pom.xml
index b10e63164a..6c0bb7e719 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
         <commons-lang3.version>3.5</commons-lang3.version>
         <commons-io.version>2.11.0</commons-io.version>
         <commons-collections4.version>4.4</commons-collections4.version>
+        <commons-csv.version>1.10.0</commons-csv.version>
         <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
         <protostuff.version>1.8.0</protostuff.version>
         <spark.scope>provided</spark.scope>
@@ -329,6 +330,11 @@
                 <artifactId>commons-collections4</artifactId>
                 <version>${commons-collections4.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-csv</artifactId>
+                <version>${commons-csv.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>com.beust</groupId>
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index d462be78ac..218ec7dd9d 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -44,6 +44,10 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-collections4</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-csv</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -63,7 +67,6 @@
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
-
     </dependencies>
 
 </project>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 2f91ef6085..5880036c90 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -192,7 +192,6 @@
     </dependencyManagement>
 
     <dependencies>
-
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-common</artifactId>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 0493eb8e4f..7f0ec48f36 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -151,6 +151,12 @@ public interface JdbcOptions {
                     .defaultValue(false)
                     .withDescription("support upsert by insert only");
 
+    Option<Boolean> USE_COPY_STATEMENT =
+            Options.key("use_copy_statement")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("support copy in statement (postgresql)");
+
     /** source config */
     Option<String> PARTITION_COLUMN =
             Options.key("partition_column")
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
index 874eb807f3..8860703ca4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
@@ -43,6 +43,7 @@ public class JdbcSinkConfig implements Serializable {
     private boolean enableUpsert;
     @Builder.Default private boolean isPrimaryKeyUpdated = true;
     private boolean supportUpsertByInsertOnly;
+    private boolean useCopyStatement;
 
     public static JdbcSinkConfig of(ReadonlyConfig config) {
         JdbcSinkConfigBuilder builder = JdbcSinkConfig.builder();
@@ -55,6 +56,7 @@ public class JdbcSinkConfig implements Serializable {
         builder.isPrimaryKeyUpdated(config.get(IS_PRIMARY_KEY_UPDATED));
         
builder.supportUpsertByInsertOnly(config.get(SUPPORT_UPSERT_BY_INSERT_ONLY));
         builder.simpleSql(config.get(JdbcOptions.QUERY));
+        builder.useCopyStatement(config.get(JdbcOptions.USE_COPY_STATEMENT));
         return builder.build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
index 22438de84c..90c1ff3002 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
@@ -29,7 +29,8 @@ public enum JdbcConnectorErrorCode implements 
SeaTunnelErrorCode {
             "JDBC-05", "transaction operation failed, such as (commit, 
rollback) etc.."),
     NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory 
found"),
     DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink"),
-    KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication 
failed");
+    KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication 
failed"),
+    NO_SUPPORT_OPERATION_FAILED("JDBC-09", "The jdbc driver not support 
operation.");
     private final String code;
 
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 4a296f9b4d..dee1b58e0e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.CopyManagerBatchStatementExecutor;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
@@ -63,7 +64,13 @@ public class JdbcOutputFormatBuilder {
                                 jdbcSinkConfig.getDatabase() + "." + 
jdbcSinkConfig.getTable()));
 
         final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
-        if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
+        if (jdbcSinkConfig.isUseCopyStatement()) {
+            statementExecutorFactory =
+                    () ->
+                            createCopyInBufferStatementExecutor(
+                                    createCopyInBatchStatementExecutor(
+                                            dialect, table, tableSchema));
+        } else if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
             statementExecutorFactory =
                     () ->
                             createSimpleBufferedExecutor(
@@ -185,6 +192,22 @@ public class JdbcOutputFormatBuilder {
                 dialect, database, table, tableSchema, pkNames, 
isPrimaryKeyUpdated);
     }
 
+    private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createCopyInBufferStatementExecutor(
+            CopyManagerBatchStatementExecutor 
copyManagerBatchStatementExecutor) {
+        return new BufferedBatchStatementExecutor(
+                copyManagerBatchStatementExecutor, Function.identity());
+    }
+
+    private static CopyManagerBatchStatementExecutor 
createCopyInBatchStatementExecutor(
+            JdbcDialect dialect, String table, TableSchema tableSchema) {
+        String columns =
+                Arrays.stream(tableSchema.getFieldNames())
+                        .map(dialect::quoteIdentifier)
+                        .collect(Collectors.joining(",", "(", ")"));
+        String copyInSql = String.format("COPY %s %s FROM STDIN WITH CSV", 
table, columns);
+        return new CopyManagerBatchStatementExecutor(copyInSql, tableSchema);
+    }
+
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createInsertOnlyExecutor(
             JdbcDialect dialect, String database, String table, TableSchema 
tableSchema) {
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
new file mode 100644
index 0000000000..b485d39de1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CopyManagerBatchStatementExecutor implements 
JdbcBatchStatementExecutor<SeaTunnelRow> {
+
+    private final String copySql;
+    private final TableSchema tableSchema;
+    CopyManagerProxy copyManagerProxy;
+    CSVFormat csvFormat = CSVFormat.POSTGRESQL_CSV;
+    CSVPrinter csvPrinter;
+
+    public CopyManagerBatchStatementExecutor(String copySql, TableSchema 
tableSchema) {
+        this.copySql = copySql;
+        this.tableSchema = tableSchema;
+    }
+
+    public static void copyManagerProxyChecked(JdbcConnectionProvider 
connectionProvider) {
+        try (Connection connection = connectionProvider.getConnection()) {
+            new CopyManagerProxy(connection);
+        } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+            throw new JdbcConnectorException(
+                    JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
+                    "unable to open CopyManager Operation in this JDBC writer. 
Please configure option use_copy_statement = false.",
+                    e);
+        } catch (SQLException e) {
+            throw new JdbcConnectorException(
+                    JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to 
open JDBC writer", e);
+        }
+    }
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        try {
+            this.copyManagerProxy = new CopyManagerProxy(connection);
+            this.csvPrinter = new CSVPrinter(new StringBuilder(), csvFormat);
+        } catch (NoSuchMethodException
+                | IllegalAccessException
+                | InvocationTargetException
+                | IOException e) {
+            throw new JdbcConnectorException(
+                    JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
+                    "unable to open CopyManager Operation in this JDBC writer. 
Please configure option use_copy_statement = false.",
+                    e);
+        } catch (SQLException e) {
+            throw new JdbcConnectorException(
+                    JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to 
open JDBC writer", e);
+        }
+    }
+
+    @Override
+    public void addToBatch(SeaTunnelRow record) throws SQLException {
+        try {
+            this.csvPrinter.printRecord(toExtract(record));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private List<Object> toExtract(SeaTunnelRow record) {
+        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
+        List<Object> csvRecord = new ArrayList<>();
+        for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); 
fieldIndex++) {
+            SeaTunnelDataType<?> seaTunnelDataType = 
rowType.getFieldType(fieldIndex);
+            Object fieldValue = record.getField(fieldIndex);
+            if (fieldValue == null) {
+                csvRecord.add(null);
+                continue;
+            }
+            switch (seaTunnelDataType.getSqlType()) {
+                case STRING:
+                    csvRecord.add((String) record.getField(fieldIndex));
+                    break;
+                case BOOLEAN:
+                    csvRecord.add((Boolean) record.getField(fieldIndex));
+                    break;
+                case TINYINT:
+                    csvRecord.add((Byte) record.getField(fieldIndex));
+                    break;
+                case SMALLINT:
+                    csvRecord.add((Short) record.getField(fieldIndex));
+                    break;
+                case INT:
+                    csvRecord.add((Integer) record.getField(fieldIndex));
+                    break;
+                case BIGINT:
+                    csvRecord.add((Long) record.getField(fieldIndex));
+                    break;
+                case FLOAT:
+                    csvRecord.add((Float) record.getField(fieldIndex));
+                    break;
+                case DOUBLE:
+                    csvRecord.add((Double) record.getField(fieldIndex));
+                    break;
+                case DECIMAL:
+                    csvRecord.add((BigDecimal) record.getField(fieldIndex));
+                    break;
+                case DATE:
+                    LocalDate localDate = (LocalDate) 
record.getField(fieldIndex);
+                    csvRecord.add((java.sql.Date) 
java.sql.Date.valueOf(localDate));
+                    break;
+                case TIME:
+                    LocalTime localTime = (LocalTime) 
record.getField(fieldIndex);
+                    csvRecord.add((java.sql.Time) 
java.sql.Time.valueOf(localTime));
+                    break;
+                case TIMESTAMP:
+                    LocalDateTime localDateTime = (LocalDateTime) 
record.getField(fieldIndex);
+                    csvRecord.add((java.sql.Timestamp) 
java.sql.Timestamp.valueOf(localDateTime));
+                    break;
+                case BYTES:
+                    csvRecord.add(
+                            
org.apache.commons.codec.binary.Base64.encodeBase64String(
+                                    (byte[]) record.getField(fieldIndex)));
+                    break;
+                case NULL:
+                    csvRecord.add(null);
+                    break;
+                case MAP:
+                case ARRAY:
+                case ROW:
+                default:
+                    throw new JdbcConnectorException(
+                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                            "Unexpected value: " + seaTunnelDataType);
+            }
+        }
+        return csvRecord;
+    }
+
+    @Override
+    public void executeBatch() throws SQLException {
+        try {
+            this.csvPrinter.flush();
+            this.copyManagerProxy.doCopy(
+                    copySql, new 
StringReader(this.csvPrinter.getOut().toString()));
+        } catch (InvocationTargetException | IllegalAccessException | 
IOException e) {
+            throw new JdbcConnectorException(
+                    CommonErrorCodeDeprecated.SQL_OPERATION_FAILED, "Sql 
command: " + copySql);
+        } finally {
+            try {
+                this.csvPrinter.close();
+                this.csvPrinter = new CSVPrinter(new StringBuilder(), 
csvFormat);
+            } catch (Exception ignore) {
+            }
+        }
+    }
+
+    @Override
+    public void closeStatements() throws SQLException {
+        this.copyManagerProxy = null;
+        try {
+            this.csvPrinter.close();
+            this.csvPrinter = null;
+        } catch (Exception ignore) {
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java
new file mode 100644
index 0000000000..54d99e345d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Reader;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+class CopyManagerProxy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CopyManagerProxy.class);
+    Object connection;
+    Object copyManager;
+    Class<?> connectionClazz;
+    Class<?> copyManagerClazz;
+    Method getCopyAPIMethod;
+    Method copyInMethod;
+
+    CopyManagerProxy(Connection connection)
+            throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException,
+                    SQLException {
+        LOG.info("Proxy connection class: {}", 
connection.getClass().getName());
+        this.connection = connection.unwrap(Connection.class);
+        LOG.info("Proxy unwrap connection class: {}", 
this.connection.getClass().getName());
+        if (Proxy.isProxyClass(this.connection.getClass())) {
+            InvocationHandler handler = 
Proxy.getInvocationHandler(this.connection);
+            this.connection = getConnectionFromInvocationHandler(handler);
+            if (null == this.connection) {
+                throw new InvocationTargetException(
+                        new NullPointerException("Proxy Connection is null."));
+            }
+            LOG.info("Proxy connection class: {}", 
this.connection.getClass().getName());
+            this.connectionClazz = this.connection.getClass();
+        } else {
+            this.connectionClazz = this.connection.getClass();
+        }
+        this.getCopyAPIMethod = this.connectionClazz.getMethod("getCopyAPI");
+        this.copyManager = this.getCopyAPIMethod.invoke(this.connection);
+        this.copyManagerClazz = this.copyManager.getClass();
+        this.copyInMethod = this.copyManagerClazz.getMethod("copyIn", 
String.class, Reader.class);
+    }
+
+    long doCopy(String sql, Reader reader)
+            throws InvocationTargetException, IllegalAccessException {
+        return (long) this.copyInMethod.invoke(this.copyManager, sql, reader);
+    }
+
+    private static Object getConnectionFromInvocationHandler(InvocationHandler 
handler)
+            throws IllegalAccessException {
+        Class<?> handlerClass = handler.getClass();
+        LOG.info("InvocationHandler class: {}", handlerClass.getName());
+        for (Field declaredField : handlerClass.getDeclaredFields()) {
+            boolean tempAccessible = declaredField.isAccessible();
+            if (!tempAccessible) {
+                declaredField.setAccessible(true);
+            }
+            Object handlerObject = declaredField.get(handler);
+            if (handlerObject instanceof Connection) {
+                if (!tempAccessible) {
+                    declaredField.setAccessible(tempAccessible);
+                }
+                return handlerObject;
+            } else {
+                if (!tempAccessible) {
+                    declaredField.setAccessible(tempAccessible);
+                }
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index a0181fbf81..7961bcb474 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -69,6 +69,7 @@ public class JdbcPostgresIT extends TestSuiteBase implements 
TestResource {
     private static final List<String> PG_CONFIG_FILE_LIST =
             Lists.newArrayList(
                     "/jdbc_postgres_source_and_sink.conf",
+                    "/jdbc_postgres_source_and_sink_copy_stmt.conf",
                     "/jdbc_postgres_source_and_sink_parallel.conf",
                     "/jdbc_postgres_source_and_sink_parallel_upper_lower.conf",
                     "/jdbc_postgres_source_and_sink_xa.conf");
@@ -259,10 +260,19 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
     public void testAutoGenerateSQL(TestContainer container)
             throws IOException, InterruptedException {
         for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) {
-            Container.ExecResult execResult = 
container.executeJob(CONFIG_FILE);
-            Assertions.assertEquals(0, execResult.getExitCode());
-            Assertions.assertIterableEquals(querySql(SOURCE_SQL), 
querySql(SINK_SQL));
-            executeSQL("truncate table pg_e2e_sink_table");
+            try {
+                Container.ExecResult execResult = 
container.executeJob(CONFIG_FILE);
+                Assertions.assertEquals(
+                        0,
+                        execResult.getExitCode(),
+                        CONFIG_FILE
+                                + " job run failed in "
+                                + container.getClass().getSimpleName()
+                                + ".");
+                Assertions.assertIterableEquals(querySql(SOURCE_SQL), 
querySql(SINK_SQL));
+            } finally {
+                executeSQL("truncate table pg_e2e_sink_table");
+            }
             log.info(CONFIG_FILE + " e2e test completed");
         }
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
new file mode 100644
index 0000000000..fabfdce9ca
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query ="""select gid, uuid_col, text_col, varchar_col, char_col, 
boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, 
real_col, double_precision_col,
+                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, 
multipoint,
+                         multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
+      partition_column = "varchar_col"
+      partition_num = 2
+    }
+}
+
+
+sink {
+  Jdbc {
+    driver = org.postgresql.Driver
+    url = 
"jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF&stringtype=unspecified"
+    user = test
+    password = test
+    generate_sink_sql = true
+    database = test
+    table = public.pg_e2e_sink_table
+    use_copy_statement = true
+    primary_keys = ["gid"]
+  }
+}
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 9ceb7f1846..988543d53a 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -3,6 +3,7 @@ commons-collections4-4.4.jar
 commons-compress-1.20.jar
 commons-io-2.11.0.jar
 commons-lang3-3.5.jar
+commons-csv-1.10.0.jar
 config-1.3.3.jar
 disruptor-3.4.4.jar
 guava-27.0-jre.jar

Reply via email to