This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d60a705f5 [JDBC] [Config] Add JDBC Fetch Size Config And Custom
Postgres PrepareStatement (#3478)
d60a705f5 is described below
commit d60a705f5d1314895df0bc5b65ab05d0756dc6bf
Author: Hisoka <[email protected]>
AuthorDate: Mon Nov 21 11:41:28 2022 +0800
[JDBC] [Config] Add JDBC Fetch Size Config And Custom Postgres
PrepareStatement (#3478)
* [JDBC] [Config] Add JDBC Fetch Size Config
---
docs/en/connector-v2/source/Jdbc.md | 7 +++++++
.../connectors/seatunnel/jdbc/config/JdbcConfig.java | 2 ++
.../seatunnel/jdbc/config/JdbcSourceOptions.java | 8 ++++++++
.../jdbc/internal/dialect/psql/PostgresDialect.java | 20 ++++++++++++++++++++
.../connectors/seatunnel/jdbc/source/JdbcSource.java | 4 ++--
5 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 1f1d5de86..20bb0bb1e 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -32,6 +32,7 @@ supports query SQL and can achieve projection effect.
| partition_upper_bound | Long | No | - |
| partition_lower_bound | Long | No | - |
| partition_num | Int | No | job parallelism |
+| fetch_size | Int | No | 0 |
| common-options | | No | - |
@@ -77,6 +78,11 @@ The partition_column min value for scan, if not set
SeaTunnel will query databas
The number of partition count, only support positive integer. default value is
job parallelism
+### fetch_size [int]
+
+For queries that return a large number of objects, you can configure the row
fetch size used in the query to
+improve performance by reducing the number database hits required to satisfy
the selection criteria. Zero means use jdbc default value.
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
@@ -151,3 +157,4 @@ parallel:
- [BugFix] Fix jdbc split bug
([3220](https://github.com/apache/incubator-seatunnel/pull/3220))
- [Feature] Support Tablestore Source
([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
+- [Feature] Support JDBC Fetch Size Config
([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
index befe8c998..12eeac7aa 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
@@ -48,6 +48,8 @@ public class JdbcConfig implements Serializable {
public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size").intType().noDefaultValue().withDescription("batch
size");
+ public static final Option<Integer> FETCH_SIZE =
Options.key("fetch_size").intType().defaultValue(0).withDescription("For
queries that return a large number of objects, " +
+ "you can configure the row fetch size used in the query to improve
performance by reducing the number database hits required to satisfy the
selection criteria. Zero means use jdbc default value.");
public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms").intType().noDefaultValue().withDescription("batch
interval milliSecond");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
index a4e7e2752..dc43810c7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
@@ -37,6 +37,7 @@ public class JdbcSourceOptions implements Serializable {
private String partitionColumn;
private Long partitionUpperBound;
private Long partitionLowerBound;
+ private int fetchSize = JdbcConfig.FETCH_SIZE.defaultValue();
private Integer partitionNumber;
public JdbcSourceOptions(Config config) {
@@ -54,6 +55,9 @@ public class JdbcSourceOptions implements Serializable {
if (config.hasPath(JdbcConfig.PARTITION_NUM.key())) {
this.partitionNumber =
config.getInt(JdbcConfig.PARTITION_NUM.key());
}
+ if (config.hasPath(JdbcConfig.FETCH_SIZE.key())) {
+ this.fetchSize = config.getInt(JdbcConfig.FETCH_SIZE.key());
+ }
}
public JdbcConnectionOptions getJdbcConnectionOptions() {
@@ -75,4 +79,8 @@ public class JdbcSourceOptions implements Serializable {
public Optional<Integer> getPartitionNumber() {
return Optional.ofNullable(partitionNumber);
}
+
+ public int getFetchSize() {
+ return fetchSize;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index cebb7e2b3..6b3b7d873 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -21,11 +21,18 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
public class PostgresDialect implements JdbcDialect {
+
+ public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;
+
@Override
public String dialectName() {
return "PostgreSQL";
@@ -53,4 +60,17 @@ public class PostgresDialect implements JdbcDialect {
getInsertIntoStatement(tableName, fieldNames), uniqueColumns,
updateClause);
return Optional.of(upsertSQL);
}
+
+ @Override
+ public PreparedStatement creatPreparedStatement(Connection connection,
String queryTemplate, int fetchSize) throws SQLException {
+ // use cursor mode, reference:
https://jdbc.postgresql.org/documentation/query/#getting-results-based-on-a-cursor
+ connection.setAutoCommit(false);
+ PreparedStatement statement =
connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ if (fetchSize > 0) {
+ statement.setFetchSize(fetchSize);
+ } else {
+ statement.setFetchSize(DEFAULT_POSTGRES_FETCH_SIZE);
+ }
+ return statement;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 962dea80d..cfd3e576e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -88,8 +88,8 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
jdbcDialect,
typeInfo,
query,
- 0,
- true
+ jdbcSourceOptions.getFetchSize(),
+ jdbcSourceOptions.getJdbcConnectionOptions().isAutoCommit()
);
}