This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e1be9d7f8a [Feature][Connector-V2][CDC] Support string type shard
fields. (#5147)
e1be9d7f8a is described below
commit e1be9d7f8a2c705712f9311a732560c1247de6d3
Author: ic4y <[email protected]>
AuthorDate: Wed Jul 26 21:09:35 2023 +0800
[Feature][Connector-V2][CDC] Support string type shard fields. (#5147)
* [feature][CDC base] Supports string type shard fields
* Delete invalid code
---
.../splitter/AbstractJdbcSourceChunkSplitter.java} | 317 +++++++++++----------
.../splitter/JdbcSourceChunkSplitter.java | 45 +--
.../connectors/cdc/base/utils/ObjectUtils.java | 2 +
.../mysql/source/eumerator/MySqlChunkSplitter.java | 309 +-------------------
.../source/eumerator/SqlServerChunkSplitter.java | 304 +-------------------
5 files changed, 168 insertions(+), 809 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
similarity index 71%
copy from
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
copy to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index ac0b8165db..e956b11170 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.eumerator;
+package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkRange;
-import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.JdbcSourceChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
-import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils;
-import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
@@ -40,18 +38,19 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import static java.math.BigDecimal.ROUND_CEILING;
import static
org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
-/** The {@code ChunkSplitter} used to split table into a set of chunks for
JDBC data source. */
@Slf4j
-public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
+public abstract class AbstractJdbcSourceChunkSplitter implements
JdbcSourceChunkSplitter {
private final JdbcSourceConfig sourceConfig;
private final JdbcDataSourceDialect dialect;
- public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
+ public AbstractJdbcSourceChunkSplitter(
+ JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
this.sourceConfig = sourceConfig;
this.dialect = dialect;
}
@@ -59,7 +58,6 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
@Override
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
log.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
@@ -100,66 +98,6 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
}
}
- @Override
- public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String
columnName)
- throws SQLException {
- return SqlServerUtils.queryMinMax(jdbc, tableId, columnName);
- }
-
- @Override
- public Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
- throws SQLException {
- return SqlServerUtils.queryMin(jdbc, tableId, columnName,
excludedLowerBound);
- }
-
- @Override
- public Object[] sampleDataFromColumn(
- JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
- throws SQLException {
- return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName,
inverseSamplingRate);
- }
-
- @Override
- public Object queryNextChunkMax(
- JdbcConnection jdbc,
- TableId tableId,
- String columnName,
- int chunkSize,
- Object includedLowerBound)
- throws SQLException {
- return SqlServerUtils.queryNextChunkMax(
- jdbc, tableId, columnName, chunkSize, includedLowerBound);
- }
-
- @Override
- public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
- return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
- }
-
- @Override
- public String buildSplitScanQuery(
- TableId tableId,
- SeaTunnelRowType splitKeyType,
- boolean isFirstSplit,
- boolean isLastSplit) {
- return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
- }
-
- @Override
- public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
- return SqlServerTypeUtils.convertFromColumn(splitColumn);
- }
-
- //
--------------------------------------------------------------------------------------------
- // Utilities
- //
--------------------------------------------------------------------------------------------
-
- /**
- * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
- * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
- * many queries and is not efficient.
- */
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
final String splitColumnName = splitColumn.name();
@@ -191,20 +129,26 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
tableId, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
} else {
int shardCount = (int) (approximateRowCnt / chunkSize);
+ int inverseSamplingRate =
sourceConfig.getInverseSamplingRate();
if (sourceConfig.getSampleShardingThreshold() < shardCount) {
+ // It is necessary to ensure that the number of data rows
sampled by the
+ // sampling rate is greater than the number of shards.
+ // Otherwise, if the sampling rate is too low, it may
result in an insufficient
+ // number of data rows for the shards, leading to an
inadequate number of
+ // shards.
+ // Therefore, inverseSamplingRate should be less than
chunkSize
+ if (inverseSamplingRate > chunkSize) {
+ log.warn(
+ "The inverseSamplingRate is {}, which is
greater than chunkSize {}, so we set inverseSamplingRate to chunkSize",
+ inverseSamplingRate,
+ chunkSize);
+ inverseSamplingRate = chunkSize;
+ }
Object[] sample =
sampleDataFromColumn(
- jdbc,
- tableId,
- splitColumnName,
- sourceConfig.getInverseSamplingRate());
- // In order to prevent data loss due to the absence of the
minimum value in the
- // sampled data, the minimum value is directly added here.
- Object[] newSample = new Object[sample.length + 1];
- newSample[0] = min;
- System.arraycopy(sample, 0, newSample, 1, sample.length);
+ jdbc, tableId, splitColumnName,
inverseSamplingRate);
return efficientShardingThroughSampling(
- tableId, newSample, approximateRowCnt, shardCount);
+ tableId, sample, approximateRowCnt, shardCount);
}
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
@@ -214,7 +158,58 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
}
}
- private List<ChunkRange> efficientShardingThroughSampling(
+ /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
+ protected List<ChunkRange> splitUnevenlySizedChunks(
+ JdbcConnection jdbc,
+ TableId tableId,
+ String splitColumnName,
+ Object min,
+ Object max,
+ int chunkSize)
+ throws SQLException {
+ log.info(
+ "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
+ final List<ChunkRange> splits = new ArrayList<>();
+ Object chunkStart = null;
+ Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName,
max, chunkSize);
+ int count = 0;
+ while (chunkEnd != null && ObjectCompare(chunkEnd, max) <= 0) {
+ // we start from [null, min + chunk_size) and avoid [null, min)
+ splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ // may sleep a while to avoid DDOS on MySQL server
+ maySleep(count++, tableId);
+ chunkStart = chunkEnd;
+ chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName,
max, chunkSize);
+ }
+ // add the ending split
+ splits.add(ChunkRange.of(chunkStart, null));
+ return splits;
+ }
+
+ protected Object nextChunkEnd(
+ JdbcConnection jdbc,
+ Object previousChunkEnd,
+ TableId tableId,
+ String splitColumnName,
+ Object max,
+ int chunkSize)
+ throws SQLException {
+ // chunk end might be null when max values are removed
+ Object chunkEnd =
+ queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize,
previousChunkEnd);
+ if (Objects.equals(previousChunkEnd, chunkEnd)) {
+ // we don't allow equal chunk start and end,
+ // should query the next one larger than chunkEnd
+ chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
+ }
+ if (ObjectCompare(chunkEnd, max) >= 0) {
+ return null;
+ } else {
+ return chunkEnd;
+ }
+ }
+
+ protected List<ChunkRange> efficientShardingThroughSampling(
TableId tableId, Object[] sampleData, long approximateRowCnt, int
shardCount) {
log.info(
"Use efficient sharding through sampling optimization for
table {}, the approximate row count is {}, the shardCount is {}",
@@ -224,16 +219,31 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
final List<ChunkRange> splits = new ArrayList<>();
- // Calculate the shard boundaries
- for (int i = 0; i < shardCount; i++) {
- Object chunkStart = sampleData[(int) ((long) i * sampleData.length
/ shardCount)];
- Object chunkEnd =
- i < shardCount - 1
- ? sampleData[(int) (((long) i + 1) *
sampleData.length / shardCount)]
- : null;
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ if (shardCount == 0) {
+ splits.add(ChunkRange.of(null, null));
+ return splits;
}
+ double approxSamplePerShard = (double) sampleData.length / shardCount;
+
+ if (approxSamplePerShard <= 1) {
+
+ splits.add(ChunkRange.of(null, sampleData[0]));
+ for (int i = 0; i < sampleData.length - 1; i++) {
+ splits.add(ChunkRange.of(sampleData[i], sampleData[i + 1]));
+ }
+ splits.add(ChunkRange.of(sampleData[sampleData.length - 1], null));
+ } else {
+ // Calculate the shard boundaries
+ for (int i = 0; i < shardCount; i++) {
+ Object chunkStart = i == 0 ? null : sampleData[(int) (i *
approxSamplePerShard)];
+ Object chunkEnd =
+ i < shardCount - 1
+ ? sampleData[(int) ((i + 1) *
approxSamplePerShard)]
+ : null;
+ splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ }
+ }
return splits;
}
@@ -241,7 +251,7 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
* Split table into evenly sized chunks based on the numeric min and max
value of split column,
* and tumble chunks in step size.
*/
- private List<ChunkRange> splitEvenlySizedChunks(
+ protected List<ChunkRange> splitEvenlySizedChunks(
TableId tableId,
Object min,
Object max,
@@ -262,7 +272,7 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
- while (ObjectUtils.compare(chunkEnd, max) <= 0) {
+ while (ObjectCompare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
@@ -277,75 +287,10 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
return splits;
}
- /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
- private List<ChunkRange> splitUnevenlySizedChunks(
- JdbcConnection jdbc,
- TableId tableId,
- String splitColumnName,
- Object min,
- Object max,
- int chunkSize)
- throws SQLException {
- log.info(
- "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName,
max, chunkSize);
- int count = 0;
- while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
- // we start from [null, min + chunk_size) and avoid [null, min)
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- // may sleep a while to avoid DDOS on MySQL server
- maySleep(count++, tableId);
- chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName,
max, chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- private Object nextChunkEnd(
- JdbcConnection jdbc,
- Object previousChunkEnd,
- TableId tableId,
- String splitColumnName,
- Object max,
- int chunkSize)
- throws SQLException {
- // chunk end might be null when max values are removed
- Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize,
previousChunkEnd);
- if (Objects.equals(previousChunkEnd, chunkEnd)) {
- // we don't allow equal chunk start and end,
- // should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
- }
- if (ObjectUtils.compare(chunkEnd, max) >= 0) {
- return null;
- } else {
- return chunkEnd;
- }
- }
-
- private SnapshotSplit createSnapshotSplit(
- JdbcConnection jdbc,
- TableId tableId,
- int chunkId,
- SeaTunnelRowType splitKeyType,
- Object chunkStart,
- Object chunkEnd) {
- // currently, we only support single split column
- Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
- Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
- return new SnapshotSplit(
- splitId(tableId, chunkId), tableId, splitKeyType, splitStart,
splitEnd);
- }
-
//
------------------------------------------------------------------------------------------
/** Returns the distribution factor of the given table. */
@SuppressWarnings("MagicNumber")
- private double calculateDistributionFactor(
+ protected double calculateDistributionFactor(
TableId tableId, Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
@@ -372,10 +317,66 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
return distributionFactor;
}
- private static String splitId(TableId tableId, int chunkId) {
+ protected SnapshotSplit createSnapshotSplit(
+ JdbcConnection jdbc,
+ TableId tableId,
+ int chunkId,
+ SeaTunnelRowType splitKeyType,
+ Object chunkStart,
+ Object chunkEnd) {
+ // currently, we only support single split column
+ Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
+ Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
+ return new SnapshotSplit(
+ splitId(tableId, chunkId), tableId, splitKeyType, splitStart,
splitEnd);
+ }
+
+ protected Column getSplitColumn(
+ JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId
tableId)
+ throws SQLException {
+ Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
+ if (primaryKey.isPresent()) {
+ List<String> pkColumns = primaryKey.get().getColumnNames();
+
+ Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+ for (String pkColumn : pkColumns) {
+ Column column = table.columnWithName(pkColumn);
+ if (isEvenlySplitColumn(column)) {
+ return column;
+ }
+ }
+ }
+
+ List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
+ if (!uniqueKeys.isEmpty()) {
+ Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+ for (ConstraintKey uniqueKey : uniqueKeys) {
+ List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
+ uniqueKey.getColumnNames();
+ for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn :
uniqueKeyColumns) {
+ Column column =
table.columnWithName(uniqueKeyColumn.getColumnName());
+ if (isEvenlySplitColumn(column)) {
+ return column;
+ }
+ }
+ }
+ }
+
+ throw new UnsupportedOperationException(
+ String.format(
+ "Incremental snapshot for tables requires primary
key/unique key,"
+ + " but table %s doesn't have primary key.",
+ tableId));
+ }
+
+ protected String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}
+ protected int ObjectCompare(Object obj1, Object obj2) {
+ return ObjectUtils.compare(obj1, obj2);
+ }
+
@SuppressWarnings("MagicNumber")
private static void maySleep(int count, TableId tableId) {
// every 100 queries to sleep 1s
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
index bbad9d04b1..b271be0d76 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -17,22 +17,16 @@
package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
/** The {@code ChunkSplitter} used to split table into a set of chunks for
JDBC data source. */
public interface JdbcSourceChunkSplitter extends ChunkSplitter {
@@ -142,6 +136,7 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
case INT:
case BIGINT:
case DECIMAL:
+ case STRING:
return true;
default:
return false;
@@ -167,42 +162,4 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
new String[] {splitColumn.name()},
new SeaTunnelDataType[] {fromDbzColumn(splitColumn)});
}
-
- default Column getSplitColumn(
- JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId
tableId)
- throws SQLException {
- Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
- if (primaryKey.isPresent()) {
- List<String> pkColumns = primaryKey.get().getColumnNames();
-
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- for (String pkColumn : pkColumns) {
- Column column = table.columnWithName(pkColumn);
- if (isEvenlySplitColumn(column)) {
- return column;
- }
- }
- }
-
- List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
- if (!uniqueKeys.isEmpty()) {
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- for (ConstraintKey uniqueKey : uniqueKeys) {
- List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
- uniqueKey.getColumnNames();
- for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn :
uniqueKeyColumns) {
- Column column =
table.columnWithName(uniqueKeyColumn.getColumnName());
- if (isEvenlySplitColumn(column)) {
- return column;
- }
- }
- }
- }
-
- throw new UnsupportedOperationException(
- String.format(
- "Incremental snapshot for tables requires primary
key/unique key,"
- + " but table %s doesn't have primary key.",
- tableId));
- }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java
index 3c5b669a25..0f703f02c1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java
@@ -63,6 +63,8 @@ public class ObjectUtils {
((BigInteger) minuend).subtract((BigInteger)
subtrahend).toString());
} else if (minuend instanceof BigDecimal) {
return ((BigDecimal) minuend).subtract((BigDecimal) subtrahend);
+ } else if (minuend instanceof String) {
+ return BigDecimal.valueOf(Long.MAX_VALUE);
} else {
throw new UnsupportedOperationException(
String.format(
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index 04671d28f5..0249889b23 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -21,86 +21,21 @@ import
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkRange;
-import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.JdbcSourceChunkSplitter;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
-import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
+import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlTypeUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.TableId;
-import java.math.BigDecimal;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import static java.math.BigDecimal.ROUND_CEILING;
-import static
org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
/** The {@code ChunkSplitter} used to split table into a set of chunks for
JDBC data source. */
-public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
-
- private static final Logger LOG =
LoggerFactory.getLogger(MySqlChunkSplitter.class);
-
- private final JdbcSourceConfig sourceConfig;
- private final JdbcDataSourceDialect dialect;
+public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter {
public MySqlChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
- this.sourceConfig = sourceConfig;
- this.dialect = dialect;
- }
-
- @Override
- public Collection<SnapshotSplit> generateSplits(TableId tableId) {
- try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
- LOG.info("Start splitting table {} into chunks...", tableId);
- long start = System.currentTimeMillis();
-
- Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new RuntimeException("Failed to split chunks for table "
+ tableId, e);
- }
-
- // convert chunks into splits
- List<SnapshotSplit> splits = new ArrayList<>();
- SeaTunnelRowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
- }
-
- long end = System.currentTimeMillis();
- LOG.info(
- "Split table {} into {} chunks, time cost: {}ms.",
- tableId,
- splits.size(),
- end - start);
- return splits;
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Generate Splits for table %s error",
tableId), e);
- }
+ super(sourceConfig, dialect);
}
@Override
@@ -153,242 +88,4 @@ public class MySqlChunkSplitter implements
JdbcSourceChunkSplitter {
public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
return MySqlTypeUtils.convertFromColumn(splitColumn);
}
-
- //
--------------------------------------------------------------------------------------------
- // Utilities
- //
--------------------------------------------------------------------------------------------
-
- /**
- * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
- * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
- * many queries and is not efficient.
- */
- private List<ChunkRange> splitTableIntoChunks(
- JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
- final String splitColumnName = splitColumn.name();
- final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
- final Object min = minMax[0];
- final Object max = minMax[1];
- if (min == null || max == null || min.equals(max)) {
- // empty table, or only one row, return full table scan as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final int chunkSize = sourceConfig.getSplitSize();
- final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
- final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
-
- if (isEvenlySplitColumn(splitColumn)) {
- long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
- double distributionFactor =
- calculateDistributionFactor(tableId, min, max,
approximateRowCnt);
-
- boolean dataIsEvenlyDistributed =
- doubleCompare(distributionFactor, distributionFactorLower)
>= 0
- && doubleCompare(distributionFactor,
distributionFactorUpper) <= 0;
-
- if (dataIsEvenlyDistributed) {
- // the minimum dynamic chunk size is at least 1
- final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
- return splitEvenlySizedChunks(
- tableId, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
- } else {
- int shardCount = (int) (approximateRowCnt / chunkSize);
- if (sourceConfig.getSampleShardingThreshold() < shardCount) {
- Object[] sample =
- sampleDataFromColumn(
- jdbc,
- tableId,
- splitColumnName,
- sourceConfig.getInverseSamplingRate());
- // In order to prevent data loss due to the absence of the
minimum value in the
- // sampled data, the minimum value is directly added here.
- Object[] newSample = new Object[sample.length + 1];
- newSample[0] = min;
- System.arraycopy(sample, 0, newSample, 1, sample.length);
- return efficientShardingThroughSampling(
- tableId, newSample, approximateRowCnt, shardCount);
- }
- return splitUnevenlySizedChunks(
- jdbc, tableId, splitColumnName, min, max, chunkSize);
- }
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName,
min, max, chunkSize);
- }
- }
-
- private List<ChunkRange> efficientShardingThroughSampling(
- TableId tableId, Object[] sampleData, long approximateRowCnt, int
shardCount) {
- LOG.info(
- "Use efficient sharding through sampling optimization for
table {}, the approximate row count is {}, the shardCount is {}",
- tableId,
- approximateRowCnt,
- shardCount);
-
- final List<ChunkRange> splits = new ArrayList<>();
-
- // Calculate the shard boundaries
- for (int i = 0; i < shardCount; i++) {
- Object chunkStart = sampleData[(int) ((long) i * sampleData.length
/ shardCount)];
- Object chunkEnd =
- i < shardCount - 1
- ? sampleData[(int) (((long) i + 1) *
sampleData.length / shardCount)]
- : null;
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- }
-
- return splits;
- }
-
- /**
- * Split table into evenly sized chunks based on the numeric min and max
value of split column,
- * and tumble chunks in step size.
- */
- private List<ChunkRange> splitEvenlySizedChunks(
- TableId tableId,
- Object min,
- Object max,
- long approximateRowCnt,
- int chunkSize,
- int dynamicChunkSize) {
- LOG.info(
- "Use evenly-sized chunk optimization for table {}, the
approximate row count is {}, the chunk size is {}, the dynamic chunk size is
{}",
- tableId,
- approximateRowCnt,
- chunkSize,
- dynamicChunkSize);
- if (approximateRowCnt <= chunkSize) {
- // there is no more than one chunk, return full table as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
- while (ObjectUtils.compare(chunkEnd, max) <= 0) {
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- chunkStart = chunkEnd;
- try {
- chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
- } catch (ArithmeticException e) {
- // Stop chunk split to avoid dead loop when number overflows.
- break;
- }
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
- private List<ChunkRange> splitUnevenlySizedChunks(
- JdbcConnection jdbc,
- TableId tableId,
- String splitColumnName,
- Object min,
- Object max,
- int chunkSize)
- throws SQLException {
- LOG.info(
- "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName,
max, chunkSize);
- int count = 0;
- while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
- // we start from [null, min + chunk_size) and avoid [null, min)
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- // may sleep a while to avoid DDOS on MySQL server
- maySleep(count++, tableId);
- chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName,
max, chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- private Object nextChunkEnd(
- JdbcConnection jdbc,
- Object previousChunkEnd,
- TableId tableId,
- String splitColumnName,
- Object max,
- int chunkSize)
- throws SQLException {
- // chunk end might be null when max values are removed
- Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize,
previousChunkEnd);
- if (Objects.equals(previousChunkEnd, chunkEnd)) {
- // we don't allow equal chunk start and end,
- // should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
- }
- if (ObjectUtils.compare(chunkEnd, max) >= 0) {
- return null;
- } else {
- return chunkEnd;
- }
- }
-
- private SnapshotSplit createSnapshotSplit(
- JdbcConnection jdbc,
- TableId tableId,
- int chunkId,
- SeaTunnelRowType splitKeyType,
- Object chunkStart,
- Object chunkEnd) {
- // currently, we only support single split column
- Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
- Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
- return new SnapshotSplit(
- splitId(tableId, chunkId), tableId, splitKeyType, splitStart,
splitEnd);
- }
-
- //
------------------------------------------------------------------------------------------
- /** Returns the distribution factor of the given table. */
- @SuppressWarnings("MagicNumber")
- private double calculateDistributionFactor(
- TableId tableId, Object min, Object max, long approximateRowCnt) {
-
- if (!min.getClass().equals(max.getClass())) {
- throw new IllegalStateException(
- String.format(
- "Unsupported operation type, the MIN value type %s
is different with MAX value type %s.",
- min.getClass().getSimpleName(),
max.getClass().getSimpleName()));
- }
- if (approximateRowCnt == 0) {
- return Double.MAX_VALUE;
- }
- BigDecimal difference = ObjectUtils.minus(max, min);
- // factor = (max - min + 1) / rowCount
- final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
- double distributionFactor =
- subRowCnt.divide(new BigDecimal(approximateRowCnt), 4,
ROUND_CEILING).doubleValue();
- LOG.info(
- "The distribution factor of table {} is {} according to the
min split key {}, max split key {} and approximate row count {}",
- tableId,
- distributionFactor,
- min,
- max,
- approximateRowCnt);
- return distributionFactor;
- }
-
- private static String splitId(TableId tableId, int chunkId) {
- return tableId.toString() + ":" + chunkId;
- }
-
- @SuppressWarnings("MagicNumber")
- private static void maySleep(int count, TableId tableId) {
- // every 100 queries to sleep 1s
- if (count % 10 == 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // nothing to do
- }
- LOG.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index ac0b8165db..7efd53dc3f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -21,10 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkRange;
-import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.JdbcSourceChunkSplitter;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
-import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
+import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils;
@@ -33,71 +30,14 @@ import io.debezium.relational.Column;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
-import java.math.BigDecimal;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import static java.math.BigDecimal.ROUND_CEILING;
-import static
org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
/** The {@code ChunkSplitter} used to split table into a set of chunks for
JDBC data source. */
@Slf4j
-public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
-
- private final JdbcSourceConfig sourceConfig;
- private final JdbcDataSourceDialect dialect;
+public class SqlServerChunkSplitter extends AbstractJdbcSourceChunkSplitter {
public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
- this.sourceConfig = sourceConfig;
- this.dialect = dialect;
- }
-
- @Override
- public Collection<SnapshotSplit> generateSplits(TableId tableId) {
- try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
- log.info("Start splitting table {} into chunks...", tableId);
- long start = System.currentTimeMillis();
-
- Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new RuntimeException("Failed to split chunks for table "
+ tableId, e);
- }
-
- // convert chunks into splits
- List<SnapshotSplit> splits = new ArrayList<>();
- SeaTunnelRowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
- }
-
- long end = System.currentTimeMillis();
- log.info(
- "Split table {} into {} chunks, time cost: {}ms.",
- tableId,
- splits.size(),
- end - start);
- return splits;
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Generate Splits for table %s error",
tableId), e);
- }
+ super(sourceConfig, dialect);
}
@Override
@@ -150,242 +90,4 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
return SqlServerTypeUtils.convertFromColumn(splitColumn);
}
-
- //
--------------------------------------------------------------------------------------------
- // Utilities
- //
--------------------------------------------------------------------------------------------
-
- /**
- * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
- * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
- * many queries and is not efficient.
- */
- private List<ChunkRange> splitTableIntoChunks(
- JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
- final String splitColumnName = splitColumn.name();
- final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
- final Object min = minMax[0];
- final Object max = minMax[1];
- if (min == null || max == null || min.equals(max)) {
- // empty table, or only one row, return full table scan as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final int chunkSize = sourceConfig.getSplitSize();
- final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
- final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
-
- if (isEvenlySplitColumn(splitColumn)) {
- long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
- double distributionFactor =
- calculateDistributionFactor(tableId, min, max,
approximateRowCnt);
-
- boolean dataIsEvenlyDistributed =
- doubleCompare(distributionFactor, distributionFactorLower)
>= 0
- && doubleCompare(distributionFactor,
distributionFactorUpper) <= 0;
-
- if (dataIsEvenlyDistributed) {
- // the minimum dynamic chunk size is at least 1
- final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
- return splitEvenlySizedChunks(
- tableId, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
- } else {
- int shardCount = (int) (approximateRowCnt / chunkSize);
- if (sourceConfig.getSampleShardingThreshold() < shardCount) {
- Object[] sample =
- sampleDataFromColumn(
- jdbc,
- tableId,
- splitColumnName,
- sourceConfig.getInverseSamplingRate());
- // In order to prevent data loss due to the absence of the
minimum value in the
- // sampled data, the minimum value is directly added here.
- Object[] newSample = new Object[sample.length + 1];
- newSample[0] = min;
- System.arraycopy(sample, 0, newSample, 1, sample.length);
- return efficientShardingThroughSampling(
- tableId, newSample, approximateRowCnt, shardCount);
- }
- return splitUnevenlySizedChunks(
- jdbc, tableId, splitColumnName, min, max, chunkSize);
- }
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName,
min, max, chunkSize);
- }
- }
-
- private List<ChunkRange> efficientShardingThroughSampling(
- TableId tableId, Object[] sampleData, long approximateRowCnt, int
shardCount) {
- log.info(
- "Use efficient sharding through sampling optimization for
table {}, the approximate row count is {}, the shardCount is {}",
- tableId,
- approximateRowCnt,
- shardCount);
-
- final List<ChunkRange> splits = new ArrayList<>();
-
- // Calculate the shard boundaries
- for (int i = 0; i < shardCount; i++) {
- Object chunkStart = sampleData[(int) ((long) i * sampleData.length
/ shardCount)];
- Object chunkEnd =
- i < shardCount - 1
- ? sampleData[(int) (((long) i + 1) *
sampleData.length / shardCount)]
- : null;
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- }
-
- return splits;
- }
-
- /**
- * Split table into evenly sized chunks based on the numeric min and max
value of split column,
- * and tumble chunks in step size.
- */
- private List<ChunkRange> splitEvenlySizedChunks(
- TableId tableId,
- Object min,
- Object max,
- long approximateRowCnt,
- int chunkSize,
- int dynamicChunkSize) {
- log.info(
- "Use evenly-sized chunk optimization for table {}, the
approximate row count is {}, the chunk size is {}, the dynamic chunk size is
{}",
- tableId,
- approximateRowCnt,
- chunkSize,
- dynamicChunkSize);
- if (approximateRowCnt <= chunkSize) {
- // there is no more than one chunk, return full table as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
- while (ObjectUtils.compare(chunkEnd, max) <= 0) {
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- chunkStart = chunkEnd;
- try {
- chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
- } catch (ArithmeticException e) {
- // Stop chunk split to avoid dead loop when number overflows.
- break;
- }
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
- private List<ChunkRange> splitUnevenlySizedChunks(
- JdbcConnection jdbc,
- TableId tableId,
- String splitColumnName,
- Object min,
- Object max,
- int chunkSize)
- throws SQLException {
- log.info(
- "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName,
max, chunkSize);
- int count = 0;
- while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
- // we start from [null, min + chunk_size) and avoid [null, min)
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- // may sleep a while to avoid DDOS on MySQL server
- maySleep(count++, tableId);
- chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName,
max, chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- private Object nextChunkEnd(
- JdbcConnection jdbc,
- Object previousChunkEnd,
- TableId tableId,
- String splitColumnName,
- Object max,
- int chunkSize)
- throws SQLException {
- // chunk end might be null when max values are removed
- Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize,
previousChunkEnd);
- if (Objects.equals(previousChunkEnd, chunkEnd)) {
- // we don't allow equal chunk start and end,
- // should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
- }
- if (ObjectUtils.compare(chunkEnd, max) >= 0) {
- return null;
- } else {
- return chunkEnd;
- }
- }
-
- private SnapshotSplit createSnapshotSplit(
- JdbcConnection jdbc,
- TableId tableId,
- int chunkId,
- SeaTunnelRowType splitKeyType,
- Object chunkStart,
- Object chunkEnd) {
- // currently, we only support single split column
- Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
- Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
- return new SnapshotSplit(
- splitId(tableId, chunkId), tableId, splitKeyType, splitStart,
splitEnd);
- }
-
- //
------------------------------------------------------------------------------------------
- /** Returns the distribution factor of the given table. */
- @SuppressWarnings("MagicNumber")
- private double calculateDistributionFactor(
- TableId tableId, Object min, Object max, long approximateRowCnt) {
-
- if (!min.getClass().equals(max.getClass())) {
- throw new IllegalStateException(
- String.format(
- "Unsupported operation type, the MIN value type %s
is different with MAX value type %s.",
- min.getClass().getSimpleName(),
max.getClass().getSimpleName()));
- }
- if (approximateRowCnt == 0) {
- return Double.MAX_VALUE;
- }
- BigDecimal difference = ObjectUtils.minus(max, min);
- // factor = (max - min + 1) / rowCount
- final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
- double distributionFactor =
- subRowCnt.divide(new BigDecimal(approximateRowCnt), 4,
ROUND_CEILING).doubleValue();
- log.info(
- "The distribution factor of table {} is {} according to the
min split key {}, max split key {} and approximate row count {}",
- tableId,
- distributionFactor,
- min,
- max,
- approximateRowCnt);
- return distributionFactor;
- }
-
- private static String splitId(TableId tableId, int chunkId) {
- return tableId.toString() + ":" + chunkId;
- }
-
- @SuppressWarnings("MagicNumber")
- private static void maySleep(int count, TableId tableId) {
- // every 100 queries to sleep 1s
- if (count % 10 == 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // nothing to do
- }
- log.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
- }
- }
}