This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c687050d88 [Feature][CDC] Support for preferring numeric fields as
split keys (#5384)
c687050d88 is described below
commit c687050d882873c93a414ca81e6b548ee3c29a94
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Wed Sep 27 10:00:32 2023 +0800
[Feature][CDC] Support for preferring numeric fields as split keys (#5384)
* [Feature][CDC] Support for preferring numeric fields as split keys
---------
Co-authored-by: zhouyao <[email protected]>
---
.../connector-cdc/connector-cdc-base/pom.xml | 10 +-
.../splitter/AbstractJdbcSourceChunkSplitter.java | 44 +++-
.../jdbc/source/JdbcSourceChunkSplitterTest.java | 245 +++++++++++++++++++++
3 files changed, 296 insertions(+), 3 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
index 9d813003fa..baefc91758 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
@@ -30,6 +30,7 @@
<properties>
<hikaricp.version>4.0.3</hikaricp.version>
+ <junit.vserion>4.13.2</junit.vserion>
</properties>
<dependencyManagement>
@@ -37,7 +38,7 @@
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
- <version>4.0.3</version>
+ <version>${hikaricp.version}</version>
</dependency>
<dependency>
@@ -97,6 +98,13 @@
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.vserion}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index e99e7dab4b..dc1d977358 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -309,6 +309,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
}
//
------------------------------------------------------------------------------------------
+
/** Returns the distribution factor of the given table. */
@SuppressWarnings("MagicNumber")
protected double calculateDistributionFactor(
@@ -356,6 +357,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId
tableId)
throws SQLException {
Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
+ Column splitColumn = null;
if (primaryKey.isPresent()) {
List<String> pkColumns = primaryKey.get().getColumnNames();
@@ -363,7 +365,10 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
for (String pkColumn : pkColumns) {
Column column = table.columnWithName(pkColumn);
if (isEvenlySplitColumn(column)) {
- return column;
+ splitColumn = columnComparable(splitColumn, column);
+ if (sqlTypePriority(splitColumn) == 1) {
+ return splitColumn;
+ }
}
}
}
@@ -377,11 +382,17 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn :
uniqueKeyColumns) {
Column column =
table.columnWithName(uniqueKeyColumn.getColumnName());
if (isEvenlySplitColumn(column)) {
- return column;
+ splitColumn = columnComparable(splitColumn, column);
+ if (sqlTypePriority(splitColumn) == 1) {
+ return splitColumn;
+ }
}
}
}
}
+ if (splitColumn != null) {
+ return splitColumn;
+ }
throw new UnsupportedOperationException(
String.format(
@@ -410,4 +421,33 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
log.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
}
}
+
+ private int sqlTypePriority(Column splitColumn) {
+ switch (fromDbzColumn(splitColumn).getSqlType()) {
+ case TINYINT:
+ return 1;
+ case SMALLINT:
+ return 2;
+ case INT:
+ return 3;
+ case BIGINT:
+ return 4;
+ case DECIMAL:
+ return 5;
+ case STRING:
+ return 6;
+ default:
+ return Integer.MAX_VALUE;
+ }
+ }
+
+ private Column columnComparable(Column then, Column other) {
+ if (then == null) {
+ return other;
+ }
+ if (sqlTypePriority(then) > sqlTypePriority(other)) {
+ return other;
+ }
+ return then;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
new file mode 100644
index 0000000000..86500f248f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jdbc.source;
+
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
+import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
+import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+public class JdbcSourceChunkSplitterTest {
+
+ @Test
+ public void splitColumnTest() throws SQLException {
+ TestJdbcSourceChunkSplitter testJdbcSourceChunkSplitter =
+ new TestJdbcSourceChunkSplitter(null, new TestSourceDialect());
+ Column splitColumn =
+ testJdbcSourceChunkSplitter.getSplitColumn(
+ null, new TestSourceDialect(), new TableId("", "",
""));
+ Assertions.assertEquals(splitColumn.typeName(), "tinyint");
+ }
+
+ private class TestJdbcSourceChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
+
+ public TestJdbcSourceChunkSplitter(
+ JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
+ super(sourceConfig, dialect);
+ }
+
+ @Override
+ public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
String columnName)
+ throws SQLException {
+ return new Object[0];
+ }
+
+ @Override
+ public Object queryMin(
+ JdbcConnection jdbc, TableId tableId, String columnName,
Object excludedLowerBound)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object[] sampleDataFromColumn(
+ JdbcConnection jdbc, TableId tableId, String columnName, int
samplingRate)
+ throws SQLException {
+ return new Object[0];
+ }
+
+ @Override
+ public Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId
tableId)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public String buildSplitScanQuery(
+ TableId tableId,
+ SeaTunnelRowType splitKeyType,
+ boolean isFirstSplit,
+ boolean isLastSplit) {
+ return null;
+ }
+
+ @Override
+ public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
+ String typeName = splitColumn.typeName();
+ switch (typeName) {
+ case "varchar":
+ return BasicType.STRING_TYPE;
+ case "tinyint":
+ return BasicType.BYTE_TYPE;
+ case "smallint":
+ return BasicType.SHORT_TYPE;
+ case "int":
+ return BasicType.INT_TYPE;
+ case "bigint":
+ return BasicType.LONG_TYPE;
+ case "decimal":
+ return new DecimalType(20, 0);
+ default:
+ return BasicType.STRING_TYPE;
+ }
+ }
+
+ @Override
+ public Column getSplitColumn(
+ JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId
tableId)
+ throws SQLException {
+ return super.getSplitColumn(jdbc, dialect, tableId);
+ }
+ }
+
+ private class TestSourceDialect implements JdbcDataSourceDialect {
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig
sourceConfig) {
+ return false;
+ }
+
+ @Override
+ public ChunkSplitter createChunkSplitter(JdbcSourceConfig
sourceConfig) {
+ return null;
+ }
+
+ @Override
+ public List<TableId> discoverDataCollections(JdbcSourceConfig
sourceConfig) {
+ return null;
+ }
+
+ @Override
+ public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
+ return null;
+ }
+
+ @Override
+ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc,
TableId tableId) {
+
+ Table table =
+ Table.editor()
+ .tableId(tableId)
+ .addColumns(
+ Column.editor()
+ .name("string_col")
+ .jdbcType(Types.VARCHAR)
+ .type("varchar")
+ .create(),
+ Column.editor()
+ .name("smallint")
+ .jdbcType(Types.SMALLINT)
+ .type("smallint")
+ .create(),
+ Column.editor()
+ .name("int")
+ .jdbcType(Types.INTEGER)
+ .type("int")
+ .create(),
+ Column.editor()
+ .name("decimal")
+ .jdbcType(Types.DECIMAL)
+ .type("decimal")
+ .create(),
+ Column.editor()
+ .name("tinyint_col")
+ .jdbcType(Types.TINYINT)
+ .type("tinyint")
+ .create(),
+ Column.editor()
+ .name("bigint_col")
+ .jdbcType(Types.BIGINT)
+ .type("bigint")
+ .create())
+ .create();
+ return new
TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
+ }
+
+ @Override
+ public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase
sourceSplitBase) {
+ return null;
+ }
+
+ @Override
+ public JdbcSourceFetchTaskContext createFetchTaskContext(
+ SourceSplitBase sourceSplitBase, JdbcSourceConfig
taskSourceConfig) {
+ return null;
+ }
+
+ @Override
+ public Optional<PrimaryKey> getPrimaryKey(JdbcConnection
jdbcConnection, TableId tableId)
+ throws SQLException {
+ return Optional.of(
+ PrimaryKey.of(
+ "pkName",
+ Arrays.asList(
+ "string_col",
+ "smallint",
+ "int",
+ "decimal",
+ "tinyint_col",
+ "bigint_col")));
+ }
+
+ @Override
+ public List<ConstraintKey> getUniqueKeys(JdbcConnection
jdbcConnection, TableId tableId)
+ throws SQLException {
+ return new ArrayList<ConstraintKey>();
+ }
+ }
+}