This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d1d267765 [Feature][Connector-V2] JDBC source support string type as
partition key (#4947)
d1d267765 is described below
commit d1d2677658469664842c4767da94db110771aa25
Author: Jia Fan <[email protected]>
AuthorDate: Sun Jul 16 15:09:35 2023 +0800
[Feature][Connector-V2] JDBC source support string type as partition key
(#4947)
---
.../jdbc/internal/dialect/JdbcDialect.java | 4 ++
.../internal/dialect/oracle/OracleDialect.java | 5 +++
.../dialect/sqlserver/SqlServerDialect.java | 5 +++
.../seatunnel/jdbc/source/JdbcSource.java | 8 ++--
.../seatunnel/jdbc/source/JdbcSourceFactory.java | 51 +++++++++++++++++-----
.../jdbc/source/JdbcSourceSplitEnumerator.java | 25 +++++++----
.../seatunnel/jdbc/source/PartitionParameter.java | 3 ++
7 files changed, 76 insertions(+), 25 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index f36067b3c..e8967fce0 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -59,6 +59,10 @@ public interface JdbcDialect extends Serializable {
*/
JdbcDialectTypeMapper getJdbcDialectTypeMapper();
+ default String hashModForField(String fieldName, int mod) {
+ return "ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + mod + ")";
+ }
+
/** Quotes the identifier for table name or field name */
default String quoteIdentifier(String identifier) {
return identifier;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index 1ca10739e..7edd935e7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -44,6 +44,11 @@ public class OracleDialect implements JdbcDialect {
return new OracleJdbcRowConverter();
}
+ @Override
+ public String hashModForField(String fieldName, int mod) {
+ return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + mod + ")";
+ }
+
@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new OracleTypeMapper();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 697d2d2dc..2121369e2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -42,6 +42,11 @@ public class SqlServerDialect implements JdbcDialect {
return new SqlserverTypeMapper();
}
+ @Override
+ public String hashModForField(String fieldName, int mod) {
+ return "ABS(HASHBYTES('MD5', " + quoteIdentifier(fieldName) + ") % " +
mod + ")";
+ }
+
@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 732892b21..aa001f78e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -113,8 +113,7 @@ public class JdbcSource
if (partitionParameter != null) {
this.query =
JdbcSourceFactory.obtainPartitionSql(
- partitionParameter.getPartitionColumnName(),
- jdbcSourceConfig.getQuery());
+ jdbcDialect, partitionParameter,
jdbcSourceConfig.getQuery());
}
this.inputFormat =
@@ -187,9 +186,10 @@ public class JdbcSource
private PartitionParameter createPartitionParameter(Connection connection)
{
if (jdbcSourceConfig.getPartitionColumn().isPresent()) {
String partitionColumn =
jdbcSourceConfig.getPartitionColumn().get();
- JdbcSourceFactory.validationPartitionColumn(partitionColumn,
typeInfo);
+ SeaTunnelDataType<?> dataType =
+
JdbcSourceFactory.validationPartitionColumn(partitionColumn, typeInfo);
return JdbcSourceFactory.createPartitionParameter(
- jdbcSourceConfig, partitionColumn, connection);
+ jdbcSourceConfig, partitionColumn, dataType, connection);
} else {
LOG.info(
"The partition_column parameter is not configured, and the
source parallelism is set to 1");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index 43aa1c03d..8c21a8423 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -111,15 +111,25 @@ public class JdbcSourceFactory implements
TableSourceFactory {
connectionProvider,
partitionParameter.isPresent()
? obtainPartitionSql(
-
partitionParameter.get().getPartitionColumnName(),
- querySql)
+ dialect,
partitionParameter.get(), querySql)
: querySql);
}
- static String obtainPartitionSql(String partitionColumn, String nativeSql)
{
+ static String obtainPartitionSql(
+ JdbcDialect dialect, PartitionParameter partitionParameter, String
nativeSql) {
+ if (isStringType(partitionParameter.getDataType())) {
+ return String.format(
+ "SELECT * FROM (%s) tt where %s = ?",
+ nativeSql,
+ dialect.hashModForField(
+ partitionParameter.getPartitionColumnName(),
+ partitionParameter.getPartitionNumber()));
+ }
return String.format(
"SELECT * FROM (%s) tt where %s >= ? AND %s <= ?",
- nativeSql, partitionColumn, partitionColumn);
+ nativeSql,
+ partitionParameter.getPartitionColumnName(),
+ partitionParameter.getPartitionColumnName());
}
public static Optional<PartitionParameter> createPartitionParameter(
@@ -129,10 +139,11 @@ public class JdbcSourceFactory implements
TableSourceFactory {
Optional<String> partitionColumnOptional = getPartitionColumn(config,
tableSchema);
if (partitionColumnOptional.isPresent()) {
String partitionColumn = partitionColumnOptional.get();
- validationPartitionColumn(partitionColumn,
tableSchema.toPhysicalRowDataType());
+ SeaTunnelDataType<?> dataType =
+ validationPartitionColumn(partitionColumn,
tableSchema.toPhysicalRowDataType());
return Optional.of(
createPartitionParameter(
- config, partitionColumn,
connectionProvider.getConnection()));
+ config, partitionColumn, dataType,
connectionProvider.getConnection()));
}
log.info(
"The partition_column parameter is not configured, and the
source parallelism is set to 1");
@@ -140,15 +151,24 @@ public class JdbcSourceFactory implements
TableSourceFactory {
}
static PartitionParameter createPartitionParameter(
- JdbcSourceConfig config, String columnName, Connection connection)
{
+ JdbcSourceConfig config,
+ String columnName,
+ SeaTunnelDataType<?> dataType,
+ Connection connection) {
BigDecimal max = null;
BigDecimal min = null;
+
+ if (dataType.equals(BasicType.STRING_TYPE)) {
+ return new PartitionParameter(
+ columnName, dataType, null, null,
config.getPartitionNumber().orElse(null));
+ }
+
if (config.getPartitionLowerBound().isPresent()
&& config.getPartitionUpperBound().isPresent()) {
max = config.getPartitionUpperBound().get();
min = config.getPartitionLowerBound().get();
return new PartitionParameter(
- columnName, min, max,
config.getPartitionNumber().orElse(null));
+ columnName, dataType, min, max,
config.getPartitionNumber().orElse(null));
}
try (ResultSet rs =
connection
@@ -171,7 +191,7 @@ public class JdbcSourceFactory implements
TableSourceFactory {
throw new PrepareFailException("jdbc", PluginType.SOURCE,
e.toString());
}
return new PartitionParameter(
- columnName, min, max,
config.getPartitionNumber().orElse(null));
+ columnName, dataType, min, max,
config.getPartitionNumber().orElse(null));
}
private static Optional<String> getPartitionColumn(
@@ -185,7 +205,8 @@ public class JdbcSourceFactory implements
TableSourceFactory {
return Optional.empty();
}
- static void validationPartitionColumn(String partitionColumn,
SeaTunnelRowType rowType) {
+ static SeaTunnelDataType<?> validationPartitionColumn(
+ String partitionColumn, SeaTunnelRowType rowType) {
Map<String, SeaTunnelDataType<?>> fieldTypes = new HashMap<>();
for (int i = 0; i < rowType.getFieldNames().length; i++) {
fieldTypes.put(rowType.getFieldName(i), rowType.getFieldType(i));
@@ -198,10 +219,12 @@ public class JdbcSourceFactory implements
TableSourceFactory {
partitionColumn));
}
SeaTunnelDataType<?> partitionColumnType =
fieldTypes.get(partitionColumn);
- if (!isNumericType(partitionColumnType)) {
+ if (!isNumericType(partitionColumnType) &&
!isStringType(partitionColumnType)) {
throw new JdbcConnectorException(
CommonErrorCode.ILLEGAL_ARGUMENT,
- String.format("%s is not numeric type", partitionColumn));
+ String.format("%s is not numeric/string type",
partitionColumn));
+ } else {
+ return partitionColumnType;
}
}
@@ -220,6 +243,10 @@ public class JdbcSourceFactory implements
TableSourceFactory {
return type.equals(BasicType.INT_TYPE) ||
type.equals(BasicType.LONG_TYPE) || scale == 0;
}
+ private static boolean isStringType(SeaTunnelDataType<?> type) {
+ return type.equals(BasicType.STRING_TYPE);
+ }
+
@Override
public OptionRule optionRule() {
return OptionRule.builder()
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index 185f87075..bd9abb2d6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
@@ -106,15 +107,21 @@ public class JdbcSourceSplitEnumerator
partitionParameter.getPartitionNumber() != null
? partitionParameter.getPartitionNumber()
: enumeratorContext.currentParallelism();
- JdbcNumericBetweenParametersProvider
jdbcNumericBetweenParametersProvider =
- new JdbcNumericBetweenParametersProvider(
- partitionParameter.getMinValue(),
- partitionParameter.getMaxValue())
- .ofBatchNum(partitionNumber);
- Serializable[][] parameterValues =
- jdbcNumericBetweenParametersProvider.getParameterValues();
- for (int i = 0; i < parameterValues.length; i++) {
- allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+ if
(partitionParameter.getDataType().equals(BasicType.STRING_TYPE)) {
+ for (int i = 0; i < partitionNumber; i++) {
+ allSplit.add(new JdbcSourceSplit(new Object[] {i}, i));
+ }
+ } else {
+ JdbcNumericBetweenParametersProvider
jdbcNumericBetweenParametersProvider =
+ new JdbcNumericBetweenParametersProvider(
+ partitionParameter.getMinValue(),
+ partitionParameter.getMaxValue())
+ .ofBatchNum(partitionNumber);
+ Serializable[][] parameterValues =
+
jdbcNumericBetweenParametersProvider.getParameterValues();
+ for (int i = 0; i < parameterValues.length; i++) {
+ allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+ }
}
} else {
allSplit.add(new JdbcSourceSplit(null, 0));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
index 61677079e..88605e876 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -28,6 +30,7 @@ import java.math.BigDecimal;
public class PartitionParameter implements Serializable {
String partitionColumnName;
+ SeaTunnelDataType<?> dataType;
BigDecimal minValue;
BigDecimal maxValue;
Integer partitionNumber;