This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ca4a65fc00 feat: jdbc support copy in statement. (#6443)
ca4a65fc00 is described below
commit ca4a65fc00e9d7ada2c9840b250dae8204880d2d
Author: MoSence <[email protected]>
AuthorDate: Mon Mar 11 18:04:07 2024 +0800
feat: jdbc support copy in statement. (#6443)
---
docs/en/connector-v2/sink/Jdbc.md | 7 +
pom.xml | 6 +
seatunnel-common/pom.xml | 5 +-
seatunnel-connectors-v2/connector-jdbc/pom.xml | 1 -
.../seatunnel/jdbc/config/JdbcOptions.java | 6 +
.../seatunnel/jdbc/config/JdbcSinkConfig.java | 2 +
.../jdbc/exception/JdbcConnectorErrorCode.java | 3 +-
.../jdbc/internal/JdbcOutputFormatBuilder.java | 25 ++-
.../CopyManagerBatchStatementExecutor.java | 195 +++++++++++++++++++++
.../jdbc/internal/executor/CopyManagerProxy.java | 92 ++++++++++
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 18 +-
.../jdbc_postgres_source_and_sink_copy_stmt.conf | 50 ++++++
tools/dependencies/known-dependencies.txt | 1 +
13 files changed, 403 insertions(+), 8 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index bfe49277ea..f0b74414a4 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -56,6 +56,7 @@ support `Xa transactions`. You can set `is_exactly_once=true`
to enable it.
| data_save_mode | Enum | No | APPEND_DATA
|
| custom_sql | String | No | -
|
| enable_upsert | Boolean | No | true
|
+| use_copy_statement | Boolean | No | false
|
### driver [string]
@@ -197,6 +198,12 @@ When data_save_mode selects CUSTOM_PROCESSING, you should
fill in the CUSTOM_SQL
Enable upsert by primary_keys exist, If the task has no key duplicate data,
setting this parameter to `false` can speed up data import
+### use_copy_statement [boolean]
+
+Use `COPY ${table} FROM STDIN` statement to import data. Only drivers with
`getCopyAPI()` method connections are supported. e.g.: Postgresql driver
`org.postgresql.Driver`.
+
+NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.
+
## tips
In the case of is_exactly_once = "true", Xa transactions are used. This
requires database support, and some databases require some setup :
diff --git a/pom.xml b/pom.xml
index b10e63164a..6c0bb7e719 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
<commons-lang3.version>3.5</commons-lang3.version>
<commons-io.version>2.11.0</commons-io.version>
<commons-collections4.version>4.4</commons-collections4.version>
+ <commons-csv.version>1.10.0</commons-csv.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<protostuff.version>1.8.0</protostuff.version>
<spark.scope>provided</spark.scope>
@@ -329,6 +330,11 @@
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ <version>${commons-csv.version}</version>
+ </dependency>
<dependency>
<groupId>com.beust</groupId>
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index d462be78ac..218ec7dd9d 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -44,6 +44,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -63,7 +67,6 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
-
</dependencies>
</project>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 2f91ef6085..5880036c90 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -192,7 +192,6 @@
</dependencyManagement>
<dependencies>
-
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 0493eb8e4f..7f0ec48f36 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -151,6 +151,12 @@ public interface JdbcOptions {
.defaultValue(false)
.withDescription("support upsert by insert only");
+ Option<Boolean> USE_COPY_STATEMENT =
+ Options.key("use_copy_statement")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("support copy in statement (postgresql)");
+
/** source config */
Option<String> PARTITION_COLUMN =
Options.key("partition_column")
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
index 874eb807f3..8860703ca4 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
@@ -43,6 +43,7 @@ public class JdbcSinkConfig implements Serializable {
private boolean enableUpsert;
@Builder.Default private boolean isPrimaryKeyUpdated = true;
private boolean supportUpsertByInsertOnly;
+ private boolean useCopyStatement;
public static JdbcSinkConfig of(ReadonlyConfig config) {
JdbcSinkConfigBuilder builder = JdbcSinkConfig.builder();
@@ -55,6 +56,7 @@ public class JdbcSinkConfig implements Serializable {
builder.isPrimaryKeyUpdated(config.get(IS_PRIMARY_KEY_UPDATED));
builder.supportUpsertByInsertOnly(config.get(SUPPORT_UPSERT_BY_INSERT_ONLY));
builder.simpleSql(config.get(JdbcOptions.QUERY));
+ builder.useCopyStatement(config.get(JdbcOptions.USE_COPY_STATEMENT));
return builder.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
index 22438de84c..90c1ff3002 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
@@ -29,7 +29,8 @@ public enum JdbcConnectorErrorCode implements
SeaTunnelErrorCode {
"JDBC-05", "transaction operation failed, such as (commit,
rollback) etc.."),
NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory
found"),
DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink"),
- KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication
failed");
+ KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication
failed"),
+ NO_SUPPORT_OPERATION_FAILED("JDBC-09", "The jdbc driver not support
operation.");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 4a296f9b4d..dee1b58e0e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.CopyManagerBatchStatementExecutor;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
@@ -63,7 +64,13 @@ public class JdbcOutputFormatBuilder {
jdbcSinkConfig.getDatabase() + "." +
jdbcSinkConfig.getTable()));
final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
- if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
+ if (jdbcSinkConfig.isUseCopyStatement()) {
+ statementExecutorFactory =
+ () ->
+ createCopyInBufferStatementExecutor(
+ createCopyInBatchStatementExecutor(
+ dialect, table, tableSchema));
+ } else if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
statementExecutorFactory =
() ->
createSimpleBufferedExecutor(
@@ -185,6 +192,22 @@ public class JdbcOutputFormatBuilder {
dialect, database, table, tableSchema, pkNames,
isPrimaryKeyUpdated);
}
+ private static JdbcBatchStatementExecutor<SeaTunnelRow>
createCopyInBufferStatementExecutor(
+ CopyManagerBatchStatementExecutor
copyManagerBatchStatementExecutor) {
+ return new BufferedBatchStatementExecutor(
+ copyManagerBatchStatementExecutor, Function.identity());
+ }
+
+ private static CopyManagerBatchStatementExecutor
createCopyInBatchStatementExecutor(
+ JdbcDialect dialect, String table, TableSchema tableSchema) {
+ String columns =
+ Arrays.stream(tableSchema.getFieldNames())
+ .map(dialect::quoteIdentifier)
+ .collect(Collectors.joining(",", "(", ")"));
+ String copyInSql = String.format("COPY %s %s FROM STDIN WITH CSV",
table, columns);
+ return new CopyManagerBatchStatementExecutor(copyInSql, tableSchema);
+ }
+
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createInsertOnlyExecutor(
JdbcDialect dialect, String database, String table, TableSchema
tableSchema) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
new file mode 100644
index 0000000000..b485d39de1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CopyManagerBatchStatementExecutor implements
JdbcBatchStatementExecutor<SeaTunnelRow> {
+
+ private final String copySql;
+ private final TableSchema tableSchema;
+ CopyManagerProxy copyManagerProxy;
+ CSVFormat csvFormat = CSVFormat.POSTGRESQL_CSV;
+ CSVPrinter csvPrinter;
+
+ public CopyManagerBatchStatementExecutor(String copySql, TableSchema
tableSchema) {
+ this.copySql = copySql;
+ this.tableSchema = tableSchema;
+ }
+
+ public static void copyManagerProxyChecked(JdbcConnectionProvider
connectionProvider) {
+ try (Connection connection = connectionProvider.getConnection()) {
+ new CopyManagerProxy(connection);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
+ "unable to open CopyManager Operation in this JDBC writer.
Please configure option use_copy_statement = false.",
+ e);
+ } catch (SQLException e) {
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to
open JDBC writer", e);
+ }
+ }
+
+ @Override
+ public void prepareStatements(Connection connection) throws SQLException {
+ try {
+ this.copyManagerProxy = new CopyManagerProxy(connection);
+ this.csvPrinter = new CSVPrinter(new StringBuilder(), csvFormat);
+ } catch (NoSuchMethodException
+ | IllegalAccessException
+ | InvocationTargetException
+ | IOException e) {
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
+ "unable to open CopyManager Operation in this JDBC writer.
Please configure option use_copy_statement = false.",
+ e);
+ } catch (SQLException e) {
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to
open JDBC writer", e);
+ }
+ }
+
+ @Override
+ public void addToBatch(SeaTunnelRow record) throws SQLException {
+ try {
+ this.csvPrinter.printRecord(toExtract(record));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private List<Object> toExtract(SeaTunnelRow record) {
+ SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
+ List<Object> csvRecord = new ArrayList<>();
+ for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields();
fieldIndex++) {
+ SeaTunnelDataType<?> seaTunnelDataType =
rowType.getFieldType(fieldIndex);
+ Object fieldValue = record.getField(fieldIndex);
+ if (fieldValue == null) {
+ csvRecord.add(null);
+ continue;
+ }
+ switch (seaTunnelDataType.getSqlType()) {
+ case STRING:
+ csvRecord.add((String) record.getField(fieldIndex));
+ break;
+ case BOOLEAN:
+ csvRecord.add((Boolean) record.getField(fieldIndex));
+ break;
+ case TINYINT:
+ csvRecord.add((Byte) record.getField(fieldIndex));
+ break;
+ case SMALLINT:
+ csvRecord.add((Short) record.getField(fieldIndex));
+ break;
+ case INT:
+ csvRecord.add((Integer) record.getField(fieldIndex));
+ break;
+ case BIGINT:
+ csvRecord.add((Long) record.getField(fieldIndex));
+ break;
+ case FLOAT:
+ csvRecord.add((Float) record.getField(fieldIndex));
+ break;
+ case DOUBLE:
+ csvRecord.add((Double) record.getField(fieldIndex));
+ break;
+ case DECIMAL:
+ csvRecord.add((BigDecimal) record.getField(fieldIndex));
+ break;
+ case DATE:
+ LocalDate localDate = (LocalDate)
record.getField(fieldIndex);
+ csvRecord.add((java.sql.Date)
java.sql.Date.valueOf(localDate));
+ break;
+ case TIME:
+ LocalTime localTime = (LocalTime)
record.getField(fieldIndex);
+ csvRecord.add((java.sql.Time)
java.sql.Time.valueOf(localTime));
+ break;
+ case TIMESTAMP:
+ LocalDateTime localDateTime = (LocalDateTime)
record.getField(fieldIndex);
+ csvRecord.add((java.sql.Timestamp)
java.sql.Timestamp.valueOf(localDateTime));
+ break;
+ case BYTES:
+ csvRecord.add(
+
org.apache.commons.codec.binary.Base64.encodeBase64String(
+ (byte[]) record.getField(fieldIndex)));
+ break;
+ case NULL:
+ csvRecord.add(null);
+ break;
+ case MAP:
+ case ARRAY:
+ case ROW:
+ default:
+ throw new JdbcConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unexpected value: " + seaTunnelDataType);
+ }
+ }
+ return csvRecord;
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ try {
+ this.csvPrinter.flush();
+ this.copyManagerProxy.doCopy(
+ copySql, new
StringReader(this.csvPrinter.getOut().toString()));
+ } catch (InvocationTargetException | IllegalAccessException |
IOException e) {
+ throw new JdbcConnectorException(
+ CommonErrorCodeDeprecated.SQL_OPERATION_FAILED, "Sql
command: " + copySql);
+ } finally {
+ try {
+ this.csvPrinter.close();
+ this.csvPrinter = new CSVPrinter(new StringBuilder(),
csvFormat);
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {
+ this.copyManagerProxy = null;
+ try {
+ this.csvPrinter.close();
+ this.csvPrinter = null;
+ } catch (Exception ignore) {
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java
new file mode 100644
index 0000000000..54d99e345d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Reader;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+class CopyManagerProxy {
+ private static final Logger LOG =
LoggerFactory.getLogger(CopyManagerProxy.class);
+ Object connection;
+ Object copyManager;
+ Class<?> connectionClazz;
+ Class<?> copyManagerClazz;
+ Method getCopyAPIMethod;
+ Method copyInMethod;
+
+ CopyManagerProxy(Connection connection)
+ throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException,
+ SQLException {
+ LOG.info("Proxy connection class: {}",
connection.getClass().getName());
+ this.connection = connection.unwrap(Connection.class);
+ LOG.info("Proxy unwrap connection class: {}",
this.connection.getClass().getName());
+ if (Proxy.isProxyClass(this.connection.getClass())) {
+ InvocationHandler handler =
Proxy.getInvocationHandler(this.connection);
+ this.connection = getConnectionFromInvocationHandler(handler);
+ if (null == this.connection) {
+ throw new InvocationTargetException(
+ new NullPointerException("Proxy Connection is null."));
+ }
+ LOG.info("Proxy connection class: {}",
this.connection.getClass().getName());
+ this.connectionClazz = this.connection.getClass();
+ } else {
+ this.connectionClazz = this.connection.getClass();
+ }
+ this.getCopyAPIMethod = this.connectionClazz.getMethod("getCopyAPI");
+ this.copyManager = this.getCopyAPIMethod.invoke(this.connection);
+ this.copyManagerClazz = this.copyManager.getClass();
+ this.copyInMethod = this.copyManagerClazz.getMethod("copyIn",
String.class, Reader.class);
+ }
+
+ long doCopy(String sql, Reader reader)
+ throws InvocationTargetException, IllegalAccessException {
+ return (long) this.copyInMethod.invoke(this.copyManager, sql, reader);
+ }
+
+ private static Object getConnectionFromInvocationHandler(InvocationHandler
handler)
+ throws IllegalAccessException {
+ Class<?> handlerClass = handler.getClass();
+ LOG.info("InvocationHandler class: {}", handlerClass.getName());
+ for (Field declaredField : handlerClass.getDeclaredFields()) {
+ boolean tempAccessible = declaredField.isAccessible();
+ if (!tempAccessible) {
+ declaredField.setAccessible(true);
+ }
+ Object handlerObject = declaredField.get(handler);
+ if (handlerObject instanceof Connection) {
+ if (!tempAccessible) {
+ declaredField.setAccessible(tempAccessible);
+ }
+ return handlerObject;
+ } else {
+ if (!tempAccessible) {
+ declaredField.setAccessible(tempAccessible);
+ }
+ }
+ }
+ return null;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index a0181fbf81..7961bcb474 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -69,6 +69,7 @@ public class JdbcPostgresIT extends TestSuiteBase implements
TestResource {
private static final List<String> PG_CONFIG_FILE_LIST =
Lists.newArrayList(
"/jdbc_postgres_source_and_sink.conf",
+ "/jdbc_postgres_source_and_sink_copy_stmt.conf",
"/jdbc_postgres_source_and_sink_parallel.conf",
"/jdbc_postgres_source_and_sink_parallel_upper_lower.conf",
"/jdbc_postgres_source_and_sink_xa.conf");
@@ -259,10 +260,19 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
public void testAutoGenerateSQL(TestContainer container)
throws IOException, InterruptedException {
for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) {
- Container.ExecResult execResult =
container.executeJob(CONFIG_FILE);
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertIterableEquals(querySql(SOURCE_SQL),
querySql(SINK_SQL));
- executeSQL("truncate table pg_e2e_sink_table");
+ try {
+ Container.ExecResult execResult =
container.executeJob(CONFIG_FILE);
+ Assertions.assertEquals(
+ 0,
+ execResult.getExitCode(),
+ CONFIG_FILE
+ + " job run failed in "
+ + container.getClass().getSimpleName()
+ + ".");
+ Assertions.assertIterableEquals(querySql(SOURCE_SQL),
querySql(SINK_SQL));
+ } finally {
+ executeSQL("truncate table pg_e2e_sink_table");
+ }
log.info(CONFIG_FILE + " e2e test completed");
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
new file mode 100644
index 0000000000..fabfdce9ca
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query ="""select gid, uuid_col, text_col, varchar_col, char_col,
boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col,
real_col, double_precision_col,
+ smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
+ partition_column = "varchar_col"
+ partition_num = 2
+ }
+}
+
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url =
"jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF&stringtype=unspecified"
+ user = test
+ password = test
+ generate_sink_sql = true
+ database = test
+ table = public.pg_e2e_sink_table
+ use_copy_statement = true
+ primary_keys = ["gid"]
+ }
+}
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 9ceb7f1846..988543d53a 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -3,6 +3,7 @@ commons-collections4-4.4.jar
commons-compress-1.20.jar
commons-io-2.11.0.jar
commons-lang3-3.5.jar
+commons-csv-1.10.0.jar
config-1.3.3.jar
disruptor-3.4.4.jar
guava-27.0-jre.jar