This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d1d267765 [Feature][Connector-V2] JDBC source support string type as 
partition key (#4947)
d1d267765 is described below

commit d1d2677658469664842c4767da94db110771aa25
Author: Jia Fan <[email protected]>
AuthorDate: Sun Jul 16 15:09:35 2023 +0800

    [Feature][Connector-V2] JDBC source support string type as partition key 
(#4947)
---
 .../jdbc/internal/dialect/JdbcDialect.java         |  4 ++
 .../internal/dialect/oracle/OracleDialect.java     |  5 +++
 .../dialect/sqlserver/SqlServerDialect.java        |  5 +++
 .../seatunnel/jdbc/source/JdbcSource.java          |  8 ++--
 .../seatunnel/jdbc/source/JdbcSourceFactory.java   | 51 +++++++++++++++++-----
 .../jdbc/source/JdbcSourceSplitEnumerator.java     | 25 +++++++----
 .../seatunnel/jdbc/source/PartitionParameter.java  |  3 ++
 7 files changed, 76 insertions(+), 25 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index f36067b3c..e8967fce0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -59,6 +59,10 @@ public interface JdbcDialect extends Serializable {
      */
     JdbcDialectTypeMapper getJdbcDialectTypeMapper();
 
+    default String hashModForField(String fieldName, int mod) {
+        return "ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + mod + ")";
+    }
+
     /** Quotes the identifier for table name or field name */
     default String quoteIdentifier(String identifier) {
         return identifier;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index 1ca10739e..7edd935e7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -44,6 +44,11 @@ public class OracleDialect implements JdbcDialect {
         return new OracleJdbcRowConverter();
     }
 
+    @Override
+    public String hashModForField(String fieldName, int mod) {
+        return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + mod + ")";
+    }
+
     @Override
     public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
         return new OracleTypeMapper();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 697d2d2dc..2121369e2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -42,6 +42,11 @@ public class SqlServerDialect implements JdbcDialect {
         return new SqlserverTypeMapper();
     }
 
+    @Override
+    public String hashModForField(String fieldName, int mod) {
+        return "ABS(HASHBYTES('MD5', " + quoteIdentifier(fieldName) + ") % " + 
mod + ")";
+    }
+
     @Override
     public Optional<String> getUpsertStatement(
             String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 732892b21..aa001f78e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -113,8 +113,7 @@ public class JdbcSource
         if (partitionParameter != null) {
             this.query =
                     JdbcSourceFactory.obtainPartitionSql(
-                            partitionParameter.getPartitionColumnName(),
-                            jdbcSourceConfig.getQuery());
+                            jdbcDialect, partitionParameter, 
jdbcSourceConfig.getQuery());
         }
 
         this.inputFormat =
@@ -187,9 +186,10 @@ public class JdbcSource
     private PartitionParameter createPartitionParameter(Connection connection) 
{
         if (jdbcSourceConfig.getPartitionColumn().isPresent()) {
             String partitionColumn = 
jdbcSourceConfig.getPartitionColumn().get();
-            JdbcSourceFactory.validationPartitionColumn(partitionColumn, 
typeInfo);
+            SeaTunnelDataType<?> dataType =
+                    
JdbcSourceFactory.validationPartitionColumn(partitionColumn, typeInfo);
             return JdbcSourceFactory.createPartitionParameter(
-                    jdbcSourceConfig, partitionColumn, connection);
+                    jdbcSourceConfig, partitionColumn, dataType, connection);
         } else {
             LOG.info(
                     "The partition_column parameter is not configured, and the 
source parallelism is set to 1");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index 43aa1c03d..8c21a8423 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -111,15 +111,25 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
                                 connectionProvider,
                                 partitionParameter.isPresent()
                                         ? obtainPartitionSql(
-                                                
partitionParameter.get().getPartitionColumnName(),
-                                                querySql)
+                                                dialect, 
partitionParameter.get(), querySql)
                                         : querySql);
     }
 
-    static String obtainPartitionSql(String partitionColumn, String nativeSql) 
{
+    static String obtainPartitionSql(
+            JdbcDialect dialect, PartitionParameter partitionParameter, String 
nativeSql) {
+        if (isStringType(partitionParameter.getDataType())) {
+            return String.format(
+                    "SELECT * FROM (%s) tt where %s = ?",
+                    nativeSql,
+                    dialect.hashModForField(
+                            partitionParameter.getPartitionColumnName(),
+                            partitionParameter.getPartitionNumber()));
+        }
         return String.format(
                 "SELECT * FROM (%s) tt where %s >= ? AND %s <= ?",
-                nativeSql, partitionColumn, partitionColumn);
+                nativeSql,
+                partitionParameter.getPartitionColumnName(),
+                partitionParameter.getPartitionColumnName());
     }
 
     public static Optional<PartitionParameter> createPartitionParameter(
@@ -129,10 +139,11 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
         Optional<String> partitionColumnOptional = getPartitionColumn(config, 
tableSchema);
         if (partitionColumnOptional.isPresent()) {
             String partitionColumn = partitionColumnOptional.get();
-            validationPartitionColumn(partitionColumn, 
tableSchema.toPhysicalRowDataType());
+            SeaTunnelDataType<?> dataType =
+                    validationPartitionColumn(partitionColumn, 
tableSchema.toPhysicalRowDataType());
             return Optional.of(
                     createPartitionParameter(
-                            config, partitionColumn, 
connectionProvider.getConnection()));
+                            config, partitionColumn, dataType, 
connectionProvider.getConnection()));
         }
         log.info(
                 "The partition_column parameter is not configured, and the 
source parallelism is set to 1");
@@ -140,15 +151,24 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
     }
 
     static PartitionParameter createPartitionParameter(
-            JdbcSourceConfig config, String columnName, Connection connection) 
{
+            JdbcSourceConfig config,
+            String columnName,
+            SeaTunnelDataType<?> dataType,
+            Connection connection) {
         BigDecimal max = null;
         BigDecimal min = null;
+
+        if (dataType.equals(BasicType.STRING_TYPE)) {
+            return new PartitionParameter(
+                    columnName, dataType, null, null, 
config.getPartitionNumber().orElse(null));
+        }
+
         if (config.getPartitionLowerBound().isPresent()
                 && config.getPartitionUpperBound().isPresent()) {
             max = config.getPartitionUpperBound().get();
             min = config.getPartitionLowerBound().get();
             return new PartitionParameter(
-                    columnName, min, max, 
config.getPartitionNumber().orElse(null));
+                    columnName, dataType, min, max, 
config.getPartitionNumber().orElse(null));
         }
         try (ResultSet rs =
                 connection
@@ -171,7 +191,7 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
             throw new PrepareFailException("jdbc", PluginType.SOURCE, 
e.toString());
         }
         return new PartitionParameter(
-                columnName, min, max, 
config.getPartitionNumber().orElse(null));
+                columnName, dataType, min, max, 
config.getPartitionNumber().orElse(null));
     }
 
     private static Optional<String> getPartitionColumn(
@@ -185,7 +205,8 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
         return Optional.empty();
     }
 
-    static void validationPartitionColumn(String partitionColumn, 
SeaTunnelRowType rowType) {
+    static SeaTunnelDataType<?> validationPartitionColumn(
+            String partitionColumn, SeaTunnelRowType rowType) {
         Map<String, SeaTunnelDataType<?>> fieldTypes = new HashMap<>();
         for (int i = 0; i < rowType.getFieldNames().length; i++) {
             fieldTypes.put(rowType.getFieldName(i), rowType.getFieldType(i));
@@ -198,10 +219,12 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
                             partitionColumn));
         }
         SeaTunnelDataType<?> partitionColumnType = 
fieldTypes.get(partitionColumn);
-        if (!isNumericType(partitionColumnType)) {
+        if (!isNumericType(partitionColumnType) && 
!isStringType(partitionColumnType)) {
             throw new JdbcConnectorException(
                     CommonErrorCode.ILLEGAL_ARGUMENT,
-                    String.format("%s is not numeric type", partitionColumn));
+                    String.format("%s is not numeric/string type", 
partitionColumn));
+        } else {
+            return partitionColumnType;
         }
     }
 
@@ -220,6 +243,10 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
         return type.equals(BasicType.INT_TYPE) || 
type.equals(BasicType.LONG_TYPE) || scale == 0;
     }
 
+    private static boolean isStringType(SeaTunnelDataType<?> type) {
+        return type.equals(BasicType.STRING_TYPE);
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index 185f87075..bd9abb2d6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
@@ -106,15 +107,21 @@ public class JdbcSourceSplitEnumerator
                     partitionParameter.getPartitionNumber() != null
                             ? partitionParameter.getPartitionNumber()
                             : enumeratorContext.currentParallelism();
-            JdbcNumericBetweenParametersProvider 
jdbcNumericBetweenParametersProvider =
-                    new JdbcNumericBetweenParametersProvider(
-                                    partitionParameter.getMinValue(),
-                                    partitionParameter.getMaxValue())
-                            .ofBatchNum(partitionNumber);
-            Serializable[][] parameterValues =
-                    jdbcNumericBetweenParametersProvider.getParameterValues();
-            for (int i = 0; i < parameterValues.length; i++) {
-                allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+            if 
(partitionParameter.getDataType().equals(BasicType.STRING_TYPE)) {
+                for (int i = 0; i < partitionNumber; i++) {
+                    allSplit.add(new JdbcSourceSplit(new Object[] {i}, i));
+                }
+            } else {
+                JdbcNumericBetweenParametersProvider 
jdbcNumericBetweenParametersProvider =
+                        new JdbcNumericBetweenParametersProvider(
+                                        partitionParameter.getMinValue(),
+                                        partitionParameter.getMaxValue())
+                                .ofBatchNum(partitionNumber);
+                Serializable[][] parameterValues =
+                        
jdbcNumericBetweenParametersProvider.getParameterValues();
+                for (int i = 0; i < parameterValues.length; i++) {
+                    allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+                }
             }
         } else {
             allSplit.add(new JdbcSourceSplit(null, 0));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
index 61677079e..88605e876 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
@@ -28,6 +30,7 @@ import java.math.BigDecimal;
 public class PartitionParameter implements Serializable {
 
     String partitionColumnName;
+    SeaTunnelDataType<?> dataType;
     BigDecimal minValue;
     BigDecimal maxValue;
     Integer partitionNumber;

Reply via email to