This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d60a705f5 [JDBC] [Config] Add JDBC Fetch Size Config And Custom 
Postgres PrepareStatement (#3478)
d60a705f5 is described below

commit d60a705f5d1314895df0bc5b65ab05d0756dc6bf
Author: Hisoka <[email protected]>
AuthorDate: Mon Nov 21 11:41:28 2022 +0800

    [JDBC] [Config] Add JDBC Fetch Size Config And Custom Postgres 
PrepareStatement (#3478)
    
    * [JDBC] [Config] Add JDBC Fetch Size Config
---
 docs/en/connector-v2/source/Jdbc.md                  |  7 +++++++
 .../connectors/seatunnel/jdbc/config/JdbcConfig.java |  2 ++
 .../seatunnel/jdbc/config/JdbcSourceOptions.java     |  8 ++++++++
 .../jdbc/internal/dialect/psql/PostgresDialect.java  | 20 ++++++++++++++++++++
 .../connectors/seatunnel/jdbc/source/JdbcSource.java |  4 ++--
 5 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 1f1d5de86..20bb0bb1e 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -32,6 +32,7 @@ supports query SQL and can achieve projection effect.
 | partition_upper_bound        | Long   | No       | -               |
 | partition_lower_bound        | Long   | No       | -               |
 | partition_num                | Int    | No       | job parallelism |
+| fetch_size                   | Int    | No       | 0               |
 | common-options               |        | No       | -               |
 
 
@@ -77,6 +78,11 @@ The partition_column min value for scan, if not set 
SeaTunnel will query databas
 
 The number of partition count, only support positive integer. default value is 
job parallelism
 
+### fetch_size [int]
+
+For queries that return a large number of objects, you can configure the row 
fetch size used in the query to 
+improve performance by reducing the number database hits required to satisfy 
the selection criteria. Zero means use jdbc default value.
+
 ### common options 
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
@@ -151,3 +157,4 @@ parallel:
 
 - [BugFix] Fix jdbc split bug 
([3220](https://github.com/apache/incubator-seatunnel/pull/3220))
 - [Feature] Support Tablestore Source 
([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
+- [Feature] Support JDBC Fetch Size Config 
([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
index befe8c998..12eeac7aa 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
@@ -48,6 +48,8 @@ public class JdbcConfig implements Serializable {
 
     public static final Option<Integer> BATCH_SIZE = 
Options.key("batch_size").intType().noDefaultValue().withDescription("batch 
size");
 
+    public static final Option<Integer> FETCH_SIZE = 
Options.key("fetch_size").intType().defaultValue(0).withDescription("For 
queries that return a large number of objects, " +
+        "you can configure the row fetch size used in the query to improve 
performance by reducing the number database hits required to satisfy the 
selection criteria. Zero means use jdbc default value.");
 
     public static final Option<Integer> BATCH_INTERVAL_MS = 
Options.key("batch_interval_ms").intType().noDefaultValue().withDescription("batch
 interval milliSecond");
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
index a4e7e2752..dc43810c7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
@@ -37,6 +37,7 @@ public class JdbcSourceOptions implements Serializable {
     private String partitionColumn;
     private Long partitionUpperBound;
     private Long partitionLowerBound;
+    private int fetchSize = JdbcConfig.FETCH_SIZE.defaultValue();
     private Integer partitionNumber;
 
     public JdbcSourceOptions(Config config) {
@@ -54,6 +55,9 @@ public class JdbcSourceOptions implements Serializable {
         if (config.hasPath(JdbcConfig.PARTITION_NUM.key())) {
             this.partitionNumber = 
config.getInt(JdbcConfig.PARTITION_NUM.key());
         }
+        if (config.hasPath(JdbcConfig.FETCH_SIZE.key())) {
+            this.fetchSize = config.getInt(JdbcConfig.FETCH_SIZE.key());
+        }
     }
 
     public JdbcConnectionOptions getJdbcConnectionOptions() {
@@ -75,4 +79,8 @@ public class JdbcSourceOptions implements Serializable {
     public Optional<Integer> getPartitionNumber() {
         return Optional.ofNullable(partitionNumber);
     }
+
+    public int getFetchSize() {
+        return fetchSize;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index cebb7e2b3..6b3b7d873 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -21,11 +21,18 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class PostgresDialect implements JdbcDialect {
+
+    public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;
+
     @Override
     public String dialectName() {
         return "PostgreSQL";
@@ -53,4 +60,17 @@ public class PostgresDialect implements JdbcDialect {
             getInsertIntoStatement(tableName, fieldNames), uniqueColumns, 
updateClause);
         return Optional.of(upsertSQL);
     }
+
+    @Override
+    public PreparedStatement creatPreparedStatement(Connection connection, 
String queryTemplate, int fetchSize) throws SQLException {
+        // use cursor mode, reference: 
https://jdbc.postgresql.org/documentation/query/#getting-results-based-on-a-cursor
+        connection.setAutoCommit(false);
+        PreparedStatement statement = 
connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+        if (fetchSize > 0) {
+            statement.setFetchSize(fetchSize);
+        } else {
+            statement.setFetchSize(DEFAULT_POSTGRES_FETCH_SIZE);
+        }
+        return statement;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 962dea80d..cfd3e576e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -88,8 +88,8 @@ public class JdbcSource implements 
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
             jdbcDialect,
             typeInfo,
             query,
-            0,
-            true
+            jdbcSourceOptions.getFetchSize(),
+            jdbcSourceOptions.getJdbcConnectionOptions().isAutoCommit()
         );
     }
 

Reply via email to