This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e61664f  [Feature][Connector] JDBC source support partition (#1544)
e61664f is described below

commit e61664f1e53840b9046d557a62339e82604777dd
Author: TrickyZerg <[email protected]>
AuthorDate: Thu Mar 24 19:06:08 2022 +0800

    [Feature][Connector] JDBC source support partition (#1544)
    
    * add parallelism support
    
    * change partition generate logic, use fetch size to generate partition
    
    * fix get parallelism method
    
    * fix connection close problem.
    
    * fix connection close problem.
    
    * remove unused import
    
    * fix document option type error
---
 docs/en/flink/configuration/source-plugins/Jdbc.md |  35 +++--
 .../java/org/apache/seatunnel/flink/Config.java    |  55 ++++++--
 .../apache/seatunnel/flink/source/JdbcSource.java  | 142 ++++++++++++++++-----
 3 files changed, 183 insertions(+), 49 deletions(-)

diff --git a/docs/en/flink/configuration/source-plugins/Jdbc.md 
b/docs/en/flink/configuration/source-plugins/Jdbc.md
index 9360867..ed2a138 100644
--- a/docs/en/flink/configuration/source-plugins/Jdbc.md
+++ b/docs/en/flink/configuration/source-plugins/Jdbc.md
@@ -8,16 +8,19 @@ Read data through jdbc
 
 ## Options
 
-| name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| driver         | string | yes      | -             |
-| url            | string | yes      | -             |
-| username       | string | yes      | -             |
-| password       | string | no       | -             |
-| query          | string | yes      | -             |
-| fetch_size     | int    | no       | -             |
-| common-options | string | no       | -             |
-| parallelism    | int    | no       | -             |
+| name                  | type   | required | default value |
+|-----------------------|--------| -------- | ------------- |
+| driver                | string | yes      | -             |
+| url                   | string | yes      | -             |
+| username              | string | yes      | -             |
+| password              | string | no       | -             |
+| query                 | string | yes      | -             |
+| fetch_size            | int    | no       | -             |
+| partition_column      | string | no       | -             |
+| partition_upper_bound | long   | no       | -             |
+| partition_lower_bound | long   | no       | -             |
+| common-options        | string | no       | -             |
+| parallelism           | int    | no       | -             |
 
 ### driver [string]
 
@@ -49,6 +52,18 @@ fetch size
 
 The parallelism of an individual operator, for JdbcSource.
 
+### partition_column [string]
+
+The column name for parallelism's partition, only support numeric type.
+
+### partition_upper_bound [long]
+
+The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.
+
+### partition_lower_bound [long]
+
+The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.
+
 ### common options [string]
 
 Source plugin common parameters, please refer to [Source 
Plugin](./source-plugin.md) for details
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/Config.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/Config.java
index b4c5c54..d140ef0 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/Config.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/Config.java
@@ -23,34 +23,69 @@ package org.apache.seatunnel.flink;
  */
 public interface Config {
 
-    /** Parallelism of the source or sink */
+    /**
+     * Parallelism of the source or sink
+     */
     String PARALLELISM = "parallelism";
 
-    /** Jdbc driver for source or sink */
+    /**
+     * Jdbc driver for source or sink
+     */
     String DRIVER = "driver";
 
-    /** Jdbc Url for source or sink */
+    /**
+     * Jdbc Url for source or sink
+     */
     String URL = "url";
 
-    /** Jdbc username for source or sink */
+    /**
+     * Jdbc username for source or sink
+     */
     String USERNAME = "username";
 
-    /** Jdbc query for source or sink */
+    /**
+     * Jdbc query for source or sink
+     */
     String QUERY = "query";
 
-    /** Jdbc password for source or sink */
+    /**
+     * Jdbc password for source or sink
+     */
     String PASSWORD = "password";
 
-    /** Jdbc fetch size for source */
+    /**
+     * Jdbc fetch size for source
+     */
     String SOURCE_FETCH_SIZE = "fetch_size";
 
-    /** Jdbc batch size for sink */
+    /**
+     * Jdbc batch size for sink
+     */
     String SINK_BATCH_SIZE = "batch_size";
 
-    /** Jdbc batch interval for sink */
+    /**
+     * Jdbc batch interval for sink
+     */
     String SINK_BATCH_INTERVAL = "batch_interval";
 
-    /** Jdbc max batch retries for sink */
+    /**
+     * Jdbc max batch retries for sink
+     */
     String SINK_BATCH_MAX_RETRIES = "batch_max_retries";
 
+    /**
+     * Jdbc partition column name
+     */
+    String PARTITION_COLUMN = "partition_column";
+
+    /**
+     * Jdbc partition upper bound
+     */
+    String PARTITION_UPPER_BOUND = "partition_upper_bound";
+
+    /**
+     * Jdbc partition lower bound
+     */
+    String PARTITION_LOWER_BOUND = "partition_lower_bound";
+
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
index 32b6cbc..dee2b61 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
@@ -19,11 +19,18 @@ package org.apache.seatunnel.flink.source;
 
 import static org.apache.seatunnel.flink.Config.DRIVER;
 import static org.apache.seatunnel.flink.Config.PARALLELISM;
+import static org.apache.seatunnel.flink.Config.PARTITION_COLUMN;
+import static org.apache.seatunnel.flink.Config.PARTITION_LOWER_BOUND;
+import static org.apache.seatunnel.flink.Config.PARTITION_UPPER_BOUND;
 import static org.apache.seatunnel.flink.Config.PASSWORD;
 import static org.apache.seatunnel.flink.Config.QUERY;
 import static org.apache.seatunnel.flink.Config.SOURCE_FETCH_SIZE;
 import static org.apache.seatunnel.flink.Config.URL;
 import static org.apache.seatunnel.flink.Config.USERNAME;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -42,6 +49,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +59,7 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -70,9 +80,12 @@ public class JdbcSource implements FlinkBatchSource {
     private String username;
     private String password;
     private int fetchSize = DEFAULT_FETCH_SIZE;
+    private int parallelism = -1;
     private Set<String> fields;
+    private Map<String, TypeInformation<?>> tableFieldInfo;
 
-    private static final Pattern COMPILE = 
Pattern.compile("[\\s]*select[\\s]*(.*)from[\\s]*([\\S]+).*");
+    private static final Pattern COMPILE = 
Pattern.compile("[\\s]*select[\\s]*(.*)from[\\s]*([\\S]+)(.*)",
+            Pattern.CASE_INSENSITIVE);
 
     private JdbcInputFormat jdbcInputFormat;
 
@@ -80,8 +93,7 @@ public class JdbcSource implements FlinkBatchSource {
     public DataSet<Row> getData(FlinkEnvironment env) {
         DataSource<Row> dataSource = 
env.getBatchEnvironment().createInput(jdbcInputFormat);
         if (config.hasPath(PARALLELISM)) {
-            int parallelism = config.getInt(PARALLELISM);
-            return dataSource.setParallelism(parallelism);
+            return dataSource.setParallelism(config.getInt(PARALLELISM));
         }
         return dataSource;
     }
@@ -116,41 +128,90 @@ public class JdbcSource implements FlinkBatchSource {
         if (config.hasPath(SOURCE_FETCH_SIZE)) {
             fetchSize = config.getInt(SOURCE_FETCH_SIZE);
         }
+        if (config.hasPath(PARALLELISM)) {
+            parallelism = config.getInt(PARALLELISM);
+        } else {
+            parallelism = env.getBatchEnvironment().getParallelism();
+        }
+        try {
+            Class.forName(driverName);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("jdbc connection init failed.", e);
+        }
 
-        jdbcInputFormat = JdbcInputFormat.buildFlinkJdbcInputFormat()
-                
.setDrivername(driverName).setDBUrl(dbUrl).setUsername(username)
-                .setPassword(password).setQuery(query).setFetchSize(fetchSize)
-                .setRowTypeInfo(getRowTypeInfo()).finish();
+        try (Connection connection = DriverManager.getConnection(dbUrl, 
username, password)) {
+            tableFieldInfo = initTableField(connection);
+            RowTypeInfo rowTypeInfo = getRowTypeInfo();
+            JdbcInputFormat.JdbcInputFormatBuilder builder = 
JdbcInputFormat.buildFlinkJdbcInputFormat();
+            if (config.hasPath(PARTITION_COLUMN)) {
+                if 
(!tableFieldInfo.containsKey(config.getString(PARTITION_COLUMN))) {
+                    throw new IllegalArgumentException(String.format("field %s 
not contain in table %s",
+                            config.getString(PARTITION_COLUMN), tableName));
+                }
+                if 
(!isNumericType(rowTypeInfo.getTypeAt(config.getString(PARTITION_COLUMN)))) {
+                    throw new IllegalArgumentException(String.format("%s is 
not numeric type", PARTITION_COLUMN));
+                }
+                JdbcParameterValuesProvider jdbcParameterValuesProvider =
+                        initPartition(config.getString(PARTITION_COLUMN), 
connection);
+                builder.setParametersProvider(jdbcParameterValuesProvider);
+                query = extendPartitionQuerySql(query, 
config.getString(PARTITION_COLUMN));
+            }
+            
builder.setDrivername(driverName).setDBUrl(dbUrl).setUsername(username)
+                    
.setPassword(password).setQuery(query).setFetchSize(fetchSize)
+                    .setRowTypeInfo(rowTypeInfo);
+
+            jdbcInputFormat = builder.finish();
+        } catch (SQLException e) {
+            throw new RuntimeException("jdbc connection init failed.", e);
+        }
     }
 
-    private Tuple2<String, Set<String>> getTableNameAndFields(Pattern regex, 
String selectSql) {
-        Matcher matcher = regex.matcher(selectSql);
-        String tableName;
-        Set<String> fields = null;
+    private String extendPartitionQuerySql(String query, String column) {
+        Matcher matcher = COMPILE.matcher(query);
         if (matcher.find()) {
-            String var = matcher.group(1);
-            tableName = matcher.group(2);
-            if (!"*".equals(var.trim())) {
-                LinkedHashSet<String> vars = new LinkedHashSet<>();
-                String[] split = var.split(",");
-                for (String s : split) {
-                    vars.add(s.trim());
-                }
-                fields = vars;
+            String where = matcher.group(Integer.parseInt("3"));
+            if (where != null && 
where.trim().toLowerCase().startsWith("where")) {
+                // contain where
+                return query + " AND \"" + column + "\" BETWEEN ? AND ?";
+            } else {
+                // not contain where
+                return query + " WHERE \"" + column + "\" BETWEEN ? AND ?";
             }
-            return new Tuple2<>(tableName, fields);
         } else {
-            throw new IllegalArgumentException("can't find tableName and 
fields in sql :" + selectSql);
+            throw new IllegalArgumentException("sql statement format is 
incorrect :" + query);
         }
     }
 
-    private RowTypeInfo getRowTypeInfo() {
+    private JdbcParameterValuesProvider initPartition(String columnName, 
Connection connection) throws SQLException {
+        long max = Long.MAX_VALUE;
+        long min = Long.MIN_VALUE;
+        if (config.hasPath(PARTITION_UPPER_BOUND) && 
config.hasPath(PARTITION_LOWER_BOUND)) {
+            max = config.getLong(PARTITION_UPPER_BOUND);
+            min = config.getLong(PARTITION_LOWER_BOUND);
+        } else {
+            ResultSet rs = 
connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s) 
" +
+                    "FROM %s", columnName, columnName, tableName));
+            if (rs.next()) {
+                max = config.hasPath(PARTITION_UPPER_BOUND) ? 
config.getLong(PARTITION_UPPER_BOUND) :
+                        Long.parseLong(rs.getString(1));
+                min = config.hasPath(PARTITION_LOWER_BOUND) ? 
config.getLong(PARTITION_LOWER_BOUND) :
+                        Long.parseLong(rs.getString(2));
+            }
+        }
+
+        return new JdbcNumericBetweenParametersProvider(min, 
max).ofBatchNum(parallelism * 2);
+    }
+
+    private boolean isNumericType(TypeInformation<?> type) {
+        return type.equals(INT_TYPE_INFO) || type.equals(SHORT_TYPE_INFO)
+                || type.equals(LONG_TYPE_INFO) || 
type.equals(BIG_INT_TYPE_INFO);
+    }
+
+    private Map<String, TypeInformation<?>> initTableField(Connection 
connection) {
         Map<String, TypeInformation<?>> map = new LinkedHashMap<>();
 
         try {
-            Class.forName(driverName);
             TypeInformationMap informationMapping = 
getTypeInformationMap(driverName);
-            Connection connection = DriverManager.getConnection(dbUrl, 
username, password);
             DatabaseMetaData metaData = connection.getMetaData();
             ResultSet columns = metaData.getColumns(connection.getCatalog(), 
connection.getSchema(), tableName, "%");
             while (columns.next()) {
@@ -160,16 +221,39 @@ public class JdbcSource implements FlinkBatchSource {
                     map.put(columnName, 
informationMapping.getInformation(dataTypeName));
                 }
             }
-            connection.close();
         } catch (Exception e) {
             LOGGER.warn("get row type info exception", e);
         }
+        return map;
+    }
 
-        int size = map.size();
+    private Tuple2<String, Set<String>> getTableNameAndFields(Pattern regex, 
String selectSql) {
+        Matcher matcher = regex.matcher(selectSql);
+        String tableName;
+        Set<String> fields = null;
+        if (matcher.find()) {
+            String var = matcher.group(1);
+            tableName = matcher.group(2);
+            if (!"*".equals(var.trim())) {
+                LinkedHashSet<String> vars = new LinkedHashSet<>();
+                String[] split = var.split(",");
+                for (String s : split) {
+                    vars.add(s.trim());
+                }
+                fields = vars;
+            }
+            return new Tuple2<>(tableName, fields);
+        } else {
+            throw new IllegalArgumentException("can't find tableName and 
fields in sql :" + selectSql);
+        }
+    }
+
+    private RowTypeInfo getRowTypeInfo() {
+        int size = tableFieldInfo.size();
         if (fields != null && fields.size() > 0) {
             size = fields.size();
         } else {
-            fields = map.keySet();
+            fields = tableFieldInfo.keySet();
         }
 
         TypeInformation<?>[] typeInformation = new TypeInformation<?>[size];
@@ -177,7 +261,7 @@ public class JdbcSource implements FlinkBatchSource {
         int i = 0;
 
         for (String field : fields) {
-            typeInformation[i] = map.get(field);
+            typeInformation[i] = tableFieldInfo.get(field);
             names[i] = field;
             i++;
         }

Reply via email to