This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c0422dbfeb [Imporve] [CDC Base] Add a fast sampling method that
supports character types (#5179)
c0422dbfeb is described below
commit c0422dbfebceec95606bdc67afcba4de35fb0e9e
Author: ic4y <[email protected]>
AuthorDate: Thu Aug 10 11:48:31 2023 +0800
[Imporve] [CDC Base] Add a fast sampling method that supports character
types (#5179)
---
.../splitter/AbstractJdbcSourceChunkSplitter.java | 23 ++++++++-
.../source/MySqlIncrementalSourceFactory.java | 3 +-
.../mysql/source/eumerator/MySqlChunkSplitter.java | 4 +-
.../seatunnel/cdc/mysql/utils/MySqlUtils.java | 53 +++++++++++++++++++++
.../source/eumerator/SqlServerChunkSplitter.java | 3 +-
.../cdc/sqlserver/source/utils/SqlServerUtils.java | 54 ++++++++++++++++++++++
6 files changed, 136 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index e956b11170..e99e7dab4b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -112,6 +112,19 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
+ final int sampleShardingThreshold =
sourceConfig.getSampleShardingThreshold();
+
+ log.info(
+ "Splitting table {} into chunks, split column: {}, min: {},
max: {}, chunk size: {}, "
+ + "distribution factor upper: {}, distribution factor
lower: {}, sample sharding threshold: {}",
+ tableId,
+ splitColumnName,
+ min,
+ max,
+ chunkSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ sampleShardingThreshold);
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
@@ -130,7 +143,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
} else {
int shardCount = (int) (approximateRowCnt / chunkSize);
int inverseSamplingRate =
sourceConfig.getInverseSamplingRate();
- if (sourceConfig.getSampleShardingThreshold() < shardCount) {
+ if (sampleShardingThreshold < shardCount) {
// It is necessary to ensure that the number of data rows
sampled by the
// sampling rate is greater than the number of shards.
// Otherwise, if the sampling rate is too low, it may
result in an insufficient
@@ -144,9 +157,17 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
chunkSize);
inverseSamplingRate = chunkSize;
}
+ log.info(
+ "Use sampling sharding for table {}, the sampling
rate is {}",
+ tableId,
+ inverseSamplingRate);
Object[] sample =
sampleDataFromColumn(
jdbc, tableId, splitColumnName,
inverseSamplingRate);
+ log.info(
+ "Sample data from table {} end, the sample size is
{}",
+ tableId,
+ sample.length);
return efficientShardingThroughSampling(
tableId, sample, approximateRowCnt, shardCount);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 396fd7bae9..a84eb79be3 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -68,7 +68,8 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
JdbcSourceOptions.CONNECTION_POOL_SIZE,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
- JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
+ JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
+ JdbcSourceOptions.INVERSE_SAMPLING_RATE)
.optional(MySqlSourceOptions.STARTUP_MODE,
MySqlSourceOptions.STOP_MODE)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index 0249889b23..c078f7cf28 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -28,10 +28,12 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.TableId;
+import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
/** The {@code ChunkSplitter} used to split table into a set of chunks for
JDBC data source. */
+@Slf4j
public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter {
public MySqlChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
@@ -55,7 +57,7 @@ public class MySqlChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
throws SQLException {
- return MySqlUtils.sampleDataFromColumn(jdbc, tableId, columnName,
inverseSamplingRate);
+ return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName,
inverseSamplingRate);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index c9223c81ff..fb00020644 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -36,11 +36,13 @@ import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
+import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -52,6 +54,7 @@ import java.util.Optional;
import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray;
/** Utils to prepare MySQL SQL statement. */
+@Slf4j
public class MySqlUtils {
private MySqlUtils() {}
@@ -142,6 +145,56 @@ public class MySqlUtils {
});
}
+ public static Object[] skipReadAndSortSampleData(
+ JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
+ throws SQLException {
+ final String sampleQuery =
+ String.format("SELECT %s FROM %s", quote(columnName),
quote(tableId));
+
+ Statement stmt = null;
+ ResultSet rs = null;
+
+ List<Object> results = new ArrayList<>();
+ try {
+ stmt =
+ jdbc.connection()
+ .createStatement(
+ ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+
+ stmt.setFetchSize(Integer.MIN_VALUE);
+ rs = stmt.executeQuery(sampleQuery);
+
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ if (count % 100000 == 0) {
+ log.info("Processing row index: {}", count);
+ }
+ if (count % inverseSamplingRate == 0) {
+ results.add(rs.getObject(1));
+ }
+ }
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ log.error("Failed to close ResultSet", e);
+ }
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ log.error("Failed to close Statement", e);
+ }
+ }
+ }
+ Object[] resultsArray = results.toArray();
+ Arrays.sort(resultsArray);
+ return resultsArray;
+ }
+
public static Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index 7efd53dc3f..1dc97020be 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -57,7 +57,8 @@ public class SqlServerChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
throws SQLException {
- return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName,
inverseSamplingRate);
+ return SqlServerUtils.skipReadAndSortSampleData(
+ jdbc, tableId, columnName, inverseSamplingRate);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index a127184984..d6e58825da 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -39,10 +39,13 @@ import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
+import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -52,6 +55,7 @@ import java.util.Map;
import java.util.Optional;
/** The utils for SqlServer data source. */
+@Slf4j
public class SqlServerUtils {
public SqlServerUtils() {}
@@ -145,6 +149,56 @@ public class SqlServerUtils {
});
}
+ public static Object[] skipReadAndSortSampleData(
+ JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
+ throws SQLException {
+ final String sampleQuery =
+ String.format("SELECT %s FROM %s", quote(columnName),
quote(tableId));
+
+ Statement stmt = null;
+ ResultSet rs = null;
+
+ List<Object> results = new ArrayList<>();
+ try {
+ stmt =
+ jdbc.connection()
+ .createStatement(
+ ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+
+ stmt.setFetchSize(Integer.MIN_VALUE);
+ rs = stmt.executeQuery(sampleQuery);
+
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ if (count % 100000 == 0) {
+ log.info("Processing row index: {}", count);
+ }
+ if (count % inverseSamplingRate == 0) {
+ results.add(rs.getObject(1));
+ }
+ }
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ log.error("Failed to close ResultSet", e);
+ }
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ log.error("Failed to close Statement", e);
+ }
+ }
+ }
+ Object[] resultsArray = results.toArray();
+ Arrays.sort(resultsArray);
+ return resultsArray;
+ }
+
/**
* Returns the next LSN to be read from the database. This is the LSN of
the last record that
* was read from the database.