This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e61664f [Feature][Connector] JDBC source support partition (#1544)
e61664f is described below
commit e61664f1e53840b9046d557a62339e82604777dd
Author: TrickyZerg <[email protected]>
AuthorDate: Thu Mar 24 19:06:08 2022 +0800
[Feature][Connector] JDBC source support partition (#1544)
* add parallelism support
* change partition generate logic, use fetch size to generate partition
* fix get parallelism method
* fix connection close problem.
* fix connection close problem.
* remove unused import
* fix document option type error
---
docs/en/flink/configuration/source-plugins/Jdbc.md | 35 +++--
.../java/org/apache/seatunnel/flink/Config.java | 55 ++++++--
.../apache/seatunnel/flink/source/JdbcSource.java | 142 ++++++++++++++++-----
3 files changed, 183 insertions(+), 49 deletions(-)
diff --git a/docs/en/flink/configuration/source-plugins/Jdbc.md
b/docs/en/flink/configuration/source-plugins/Jdbc.md
index 9360867..ed2a138 100644
--- a/docs/en/flink/configuration/source-plugins/Jdbc.md
+++ b/docs/en/flink/configuration/source-plugins/Jdbc.md
@@ -8,16 +8,19 @@ Read data through jdbc
## Options
-| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| driver | string | yes | - |
-| url | string | yes | - |
-| username | string | yes | - |
-| password | string | no | - |
-| query | string | yes | - |
-| fetch_size | int | no | - |
-| common-options | string | no | - |
-| parallelism | int | no | - |
+| name | type | required | default value |
+|-----------------------|--------| -------- | ------------- |
+| driver | string | yes | - |
+| url | string | yes | - |
+| username | string | yes | - |
+| password | string | no | - |
+| query | string | yes | - |
+| fetch_size | int | no | - |
+| partition_column | string | no | - |
+| partition_upper_bound | long | no | - |
+| partition_lower_bound | long | no | - |
+| common-options | string | no | - |
+| parallelism | int | no | - |
### driver [string]
@@ -49,6 +52,18 @@ fetch size
The parallelism of an individual operator, for JdbcSource.
+### partition_column [string]
+
+The column name for parallelism's partition, only support numeric type.
+
+### partition_upper_bound [long]
+
+The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
+
+### partition_lower_bound [long]
+
+The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
+
### common options [string]
Source plugin common parameters, please refer to [Source
Plugin](./source-plugin.md) for details
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/Config.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/Config.java
index b4c5c54..d140ef0 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/Config.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/Config.java
@@ -23,34 +23,69 @@ package org.apache.seatunnel.flink;
*/
public interface Config {
- /** Parallelism of the source or sink */
+ /**
+ * Parallelism of the source or sink
+ */
String PARALLELISM = "parallelism";
- /** Jdbc driver for source or sink */
+ /**
+ * Jdbc driver for source or sink
+ */
String DRIVER = "driver";
- /** Jdbc Url for source or sink */
+ /**
+ * Jdbc Url for source or sink
+ */
String URL = "url";
- /** Jdbc username for source or sink */
+ /**
+ * Jdbc username for source or sink
+ */
String USERNAME = "username";
- /** Jdbc query for source or sink */
+ /**
+ * Jdbc query for source or sink
+ */
String QUERY = "query";
- /** Jdbc password for source or sink */
+ /**
+ * Jdbc password for source or sink
+ */
String PASSWORD = "password";
- /** Jdbc fetch size for source */
+ /**
+ * Jdbc fetch size for source
+ */
String SOURCE_FETCH_SIZE = "fetch_size";
- /** Jdbc batch size for sink */
+ /**
+ * Jdbc batch size for sink
+ */
String SINK_BATCH_SIZE = "batch_size";
- /** Jdbc batch interval for sink */
+ /**
+ * Jdbc batch interval for sink
+ */
String SINK_BATCH_INTERVAL = "batch_interval";
- /** Jdbc max batch retries for sink */
+ /**
+ * Jdbc max batch retries for sink
+ */
String SINK_BATCH_MAX_RETRIES = "batch_max_retries";
+ /**
+ * Jdbc partition column name
+ */
+ String PARTITION_COLUMN = "partition_column";
+
+ /**
+ * Jdbc partition upper bound
+ */
+ String PARTITION_UPPER_BOUND = "partition_upper_bound";
+
+ /**
+ * Jdbc partition lower bound
+ */
+ String PARTITION_LOWER_BOUND = "partition_lower_bound";
+
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
index 32b6cbc..dee2b61 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
@@ -19,11 +19,18 @@ package org.apache.seatunnel.flink.source;
import static org.apache.seatunnel.flink.Config.DRIVER;
import static org.apache.seatunnel.flink.Config.PARALLELISM;
+import static org.apache.seatunnel.flink.Config.PARTITION_COLUMN;
+import static org.apache.seatunnel.flink.Config.PARTITION_LOWER_BOUND;
+import static org.apache.seatunnel.flink.Config.PARTITION_UPPER_BOUND;
import static org.apache.seatunnel.flink.Config.PASSWORD;
import static org.apache.seatunnel.flink.Config.QUERY;
import static org.apache.seatunnel.flink.Config.SOURCE_FETCH_SIZE;
import static org.apache.seatunnel.flink.Config.URL;
import static org.apache.seatunnel.flink.Config.USERNAME;
+import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
@@ -42,6 +49,8 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +59,7 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -70,9 +80,12 @@ public class JdbcSource implements FlinkBatchSource {
private String username;
private String password;
private int fetchSize = DEFAULT_FETCH_SIZE;
+ private int parallelism = -1;
private Set<String> fields;
+ private Map<String, TypeInformation<?>> tableFieldInfo;
- private static final Pattern COMPILE =
Pattern.compile("[\\s]*select[\\s]*(.*)from[\\s]*([\\S]+).*");
+ private static final Pattern COMPILE =
Pattern.compile("[\\s]*select[\\s]*(.*)from[\\s]*([\\S]+)(.*)",
+ Pattern.CASE_INSENSITIVE);
private JdbcInputFormat jdbcInputFormat;
@@ -80,8 +93,7 @@ public class JdbcSource implements FlinkBatchSource {
public DataSet<Row> getData(FlinkEnvironment env) {
DataSource<Row> dataSource =
env.getBatchEnvironment().createInput(jdbcInputFormat);
if (config.hasPath(PARALLELISM)) {
- int parallelism = config.getInt(PARALLELISM);
- return dataSource.setParallelism(parallelism);
+ return dataSource.setParallelism(config.getInt(PARALLELISM));
}
return dataSource;
}
@@ -116,41 +128,90 @@ public class JdbcSource implements FlinkBatchSource {
if (config.hasPath(SOURCE_FETCH_SIZE)) {
fetchSize = config.getInt(SOURCE_FETCH_SIZE);
}
+ if (config.hasPath(PARALLELISM)) {
+ parallelism = config.getInt(PARALLELISM);
+ } else {
+ parallelism = env.getBatchEnvironment().getParallelism();
+ }
+ try {
+ Class.forName(driverName);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("jdbc connection init failed.", e);
+ }
- jdbcInputFormat = JdbcInputFormat.buildFlinkJdbcInputFormat()
-
.setDrivername(driverName).setDBUrl(dbUrl).setUsername(username)
- .setPassword(password).setQuery(query).setFetchSize(fetchSize)
- .setRowTypeInfo(getRowTypeInfo()).finish();
+ try (Connection connection = DriverManager.getConnection(dbUrl,
username, password)) {
+ tableFieldInfo = initTableField(connection);
+ RowTypeInfo rowTypeInfo = getRowTypeInfo();
+ JdbcInputFormat.JdbcInputFormatBuilder builder =
JdbcInputFormat.buildFlinkJdbcInputFormat();
+ if (config.hasPath(PARTITION_COLUMN)) {
+ if
(!tableFieldInfo.containsKey(config.getString(PARTITION_COLUMN))) {
+ throw new IllegalArgumentException(String.format("field %s
not contain in table %s",
+ config.getString(PARTITION_COLUMN), tableName));
+ }
+ if
(!isNumericType(rowTypeInfo.getTypeAt(config.getString(PARTITION_COLUMN)))) {
+ throw new IllegalArgumentException(String.format("%s is
not numeric type", PARTITION_COLUMN));
+ }
+ JdbcParameterValuesProvider jdbcParameterValuesProvider =
+ initPartition(config.getString(PARTITION_COLUMN),
connection);
+ builder.setParametersProvider(jdbcParameterValuesProvider);
+ query = extendPartitionQuerySql(query,
config.getString(PARTITION_COLUMN));
+ }
+
builder.setDrivername(driverName).setDBUrl(dbUrl).setUsername(username)
+
.setPassword(password).setQuery(query).setFetchSize(fetchSize)
+ .setRowTypeInfo(rowTypeInfo);
+
+ jdbcInputFormat = builder.finish();
+ } catch (SQLException e) {
+ throw new RuntimeException("jdbc connection init failed.", e);
+ }
}
- private Tuple2<String, Set<String>> getTableNameAndFields(Pattern regex,
String selectSql) {
- Matcher matcher = regex.matcher(selectSql);
- String tableName;
- Set<String> fields = null;
+ private String extendPartitionQuerySql(String query, String column) {
+ Matcher matcher = COMPILE.matcher(query);
if (matcher.find()) {
- String var = matcher.group(1);
- tableName = matcher.group(2);
- if (!"*".equals(var.trim())) {
- LinkedHashSet<String> vars = new LinkedHashSet<>();
- String[] split = var.split(",");
- for (String s : split) {
- vars.add(s.trim());
- }
- fields = vars;
+ String where = matcher.group(Integer.parseInt("3"));
+ if (where != null &&
where.trim().toLowerCase().startsWith("where")) {
+ // contain where
+ return query + " AND \"" + column + "\" BETWEEN ? AND ?";
+ } else {
+ // not contain where
+ return query + " WHERE \"" + column + "\" BETWEEN ? AND ?";
}
- return new Tuple2<>(tableName, fields);
} else {
- throw new IllegalArgumentException("can't find tableName and
fields in sql :" + selectSql);
+ throw new IllegalArgumentException("sql statement format is
incorrect :" + query);
}
}
- private RowTypeInfo getRowTypeInfo() {
+ private JdbcParameterValuesProvider initPartition(String columnName,
Connection connection) throws SQLException {
+ long max = Long.MAX_VALUE;
+ long min = Long.MIN_VALUE;
+ if (config.hasPath(PARTITION_UPPER_BOUND) &&
config.hasPath(PARTITION_LOWER_BOUND)) {
+ max = config.getLong(PARTITION_UPPER_BOUND);
+ min = config.getLong(PARTITION_LOWER_BOUND);
+ } else {
+ ResultSet rs =
connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s)
" +
+ "FROM %s", columnName, columnName, tableName));
+ if (rs.next()) {
+ max = config.hasPath(PARTITION_UPPER_BOUND) ?
config.getLong(PARTITION_UPPER_BOUND) :
+ Long.parseLong(rs.getString(1));
+ min = config.hasPath(PARTITION_LOWER_BOUND) ?
config.getLong(PARTITION_LOWER_BOUND) :
+ Long.parseLong(rs.getString(2));
+ }
+ }
+
+ return new JdbcNumericBetweenParametersProvider(min,
max).ofBatchNum(parallelism * 2);
+ }
+
+ private boolean isNumericType(TypeInformation<?> type) {
+ return type.equals(INT_TYPE_INFO) || type.equals(SHORT_TYPE_INFO)
+ || type.equals(LONG_TYPE_INFO) ||
type.equals(BIG_INT_TYPE_INFO);
+ }
+
+ private Map<String, TypeInformation<?>> initTableField(Connection
connection) {
Map<String, TypeInformation<?>> map = new LinkedHashMap<>();
try {
- Class.forName(driverName);
TypeInformationMap informationMapping =
getTypeInformationMap(driverName);
- Connection connection = DriverManager.getConnection(dbUrl,
username, password);
DatabaseMetaData metaData = connection.getMetaData();
ResultSet columns = metaData.getColumns(connection.getCatalog(),
connection.getSchema(), tableName, "%");
while (columns.next()) {
@@ -160,16 +221,39 @@ public class JdbcSource implements FlinkBatchSource {
map.put(columnName,
informationMapping.getInformation(dataTypeName));
}
}
- connection.close();
} catch (Exception e) {
LOGGER.warn("get row type info exception", e);
}
+ return map;
+ }
- int size = map.size();
+ private Tuple2<String, Set<String>> getTableNameAndFields(Pattern regex,
String selectSql) {
+ Matcher matcher = regex.matcher(selectSql);
+ String tableName;
+ Set<String> fields = null;
+ if (matcher.find()) {
+ String var = matcher.group(1);
+ tableName = matcher.group(2);
+ if (!"*".equals(var.trim())) {
+ LinkedHashSet<String> vars = new LinkedHashSet<>();
+ String[] split = var.split(",");
+ for (String s : split) {
+ vars.add(s.trim());
+ }
+ fields = vars;
+ }
+ return new Tuple2<>(tableName, fields);
+ } else {
+ throw new IllegalArgumentException("can't find tableName and
fields in sql :" + selectSql);
+ }
+ }
+
+ private RowTypeInfo getRowTypeInfo() {
+ int size = tableFieldInfo.size();
if (fields != null && fields.size() > 0) {
size = fields.size();
} else {
- fields = map.keySet();
+ fields = tableFieldInfo.keySet();
}
TypeInformation<?>[] typeInformation = new TypeInformation<?>[size];
@@ -177,7 +261,7 @@ public class JdbcSource implements FlinkBatchSource {
int i = 0;
for (String field : fields) {
- typeInformation[i] = map.get(field);
+ typeInformation[i] = tableFieldInfo.get(field);
names[i] = field;
i++;
}