This is an automated email from the ASF dual-hosted git repository. ic4y pushed a commit to branch dev-addsampskip in repository https://gitbox.apache.org/repos/asf/seatunnel.git
commit 4c70f76333ae070f43a77783b600536b906a9ae0 Author: liuli <[email protected]> AuthorDate: Sat Jul 29 11:55:19 2023 +0800 [Imporve] [CDC Base] Add a fast sampling method that supports character types --- .../splitter/AbstractJdbcSourceChunkSplitter.java | 23 ++++++++- .../source/MySqlIncrementalSourceFactory.java | 3 +- .../mysql/source/eumerator/MySqlChunkSplitter.java | 4 +- .../seatunnel/cdc/mysql/utils/MySqlUtils.java | 53 +++++++++++++++++++++ .../source/eumerator/SqlServerChunkSplitter.java | 3 +- .../cdc/sqlserver/source/utils/SqlServerUtils.java | 54 ++++++++++++++++++++++ 6 files changed, 136 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java index e956b11170..e99e7dab4b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java @@ -112,6 +112,19 @@ public abstract class AbstractJdbcSourceChunkSplitter implements JdbcSourceChunk final int chunkSize = sourceConfig.getSplitSize(); final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); + final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold(); + + log.info( + "Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, " + + "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}", + tableId, + splitColumnName, + min, + max, + chunkSize, + distributionFactorUpper, + distributionFactorLower, + sampleShardingThreshold); if (isEvenlySplitColumn(splitColumn)) { long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); @@ -130,7 +143,7 @@ public abstract class AbstractJdbcSourceChunkSplitter implements JdbcSourceChunk } else { int shardCount = (int) (approximateRowCnt / chunkSize); int inverseSamplingRate = sourceConfig.getInverseSamplingRate(); - if (sourceConfig.getSampleShardingThreshold() < shardCount) { + if (sampleShardingThreshold < shardCount) { // It is necessary to ensure that the number of data rows sampled by the // sampling rate is greater than the number of shards. // Otherwise, if the sampling rate is too low, it may result in an insufficient @@ -144,9 +157,17 @@ public abstract class AbstractJdbcSourceChunkSplitter implements JdbcSourceChunk chunkSize); inverseSamplingRate = chunkSize; } + log.info( + "Use sampling sharding for table {}, the sampling rate is {}", + tableId, + inverseSamplingRate); Object[] sample = sampleDataFromColumn( jdbc, tableId, splitColumnName, inverseSamplingRate); + log.info( + "Sample data from table {} end, the sample size is {}", + tableId, + sample.length); return efficientShardingThroughSampling( tableId, sample, approximateRowCnt, shardCount); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index 396fd7bae9..a84eb79be3 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -68,7 +68,8 @@ public class MySqlIncrementalSourceFactory implements TableSourceFactory, Suppor JdbcSourceOptions.CONNECTION_POOL_SIZE, JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND, JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND, - JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD) + JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD, + JdbcSourceOptions.INVERSE_SAMPLING_RATE) .optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE) .conditional( MySqlSourceOptions.STARTUP_MODE, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java index 0249889b23..c078f7cf28 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java @@ -28,10 +28,12 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.TableId; +import lombok.extern.slf4j.Slf4j; import java.sql.SQLException; /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ +@Slf4j public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter { public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { @@ -55,7 +57,7 @@ public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter { public Object[] sampleDataFromColumn( JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) throws SQLException { - return MySqlUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate); + return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, inverseSamplingRate); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java index c9223c81ff..fb00020644 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java @@ -36,11 +36,13 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; +import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -52,6 +54,7 @@ import java.util.Optional; import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray; /** Utils to prepare MySQL SQL statement. */ +@Slf4j public class MySqlUtils { private MySqlUtils() {} @@ -142,6 +145,56 @@ public class MySqlUtils { }); } + public static Object[] skipReadAndSortSampleData( + JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + throws SQLException { + final String sampleQuery = + String.format("SELECT %s FROM %s", quote(columnName), quote(tableId)); + + Statement stmt = null; + ResultSet rs = null; + + List<Object> results = new ArrayList<>(); + try { + stmt = + jdbc.connection() + .createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmt.setFetchSize(Integer.MIN_VALUE); + rs = stmt.executeQuery(sampleQuery); + + int count = 0; + while (rs.next()) { + count++; + if (count % 100000 == 0) { + log.info("Processing row index: {}", count); + } + if (count % inverseSamplingRate == 0) { + results.add(rs.getObject(1)); + } + } + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + log.error("Failed to close ResultSet", e); + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + log.error("Failed to close Statement", e); + } + } + } + Object[] resultsArray = results.toArray(); + Arrays.sort(resultsArray); + return resultsArray; + } + public static Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java index 7efd53dc3f..1dc97020be 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java @@ -57,7 +57,8 @@ public class SqlServerChunkSplitter extends AbstractJdbcSourceChunkSplitter { public Object[] sampleDataFromColumn( JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) throws SQLException { - return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate); + return SqlServerUtils.skipReadAndSortSampleData( + jdbc, tableId, columnName, inverseSamplingRate); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java index a127184984..d6e58825da 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java @@ -39,10 +39,13 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; +import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -52,6 +55,7 @@ import java.util.Map; import java.util.Optional; /** The utils for SqlServer data source. */ +@Slf4j public class SqlServerUtils { public SqlServerUtils() {} @@ -145,6 +149,56 @@ public class SqlServerUtils { }); } + public static Object[] skipReadAndSortSampleData( + JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + throws SQLException { + final String sampleQuery = + String.format("SELECT %s FROM %s", quote(columnName), quote(tableId)); + + Statement stmt = null; + ResultSet rs = null; + + List<Object> results = new ArrayList<>(); + try { + stmt = + jdbc.connection() + .createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmt.setFetchSize(Integer.MIN_VALUE); + rs = stmt.executeQuery(sampleQuery); + + int count = 0; + while (rs.next()) { + count++; + if (count % 100000 == 0) { + log.info("Processing row index: {}", count); + } + if (count % inverseSamplingRate == 0) { + results.add(rs.getObject(1)); + } + } + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + log.error("Failed to close ResultSet", e); + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + log.error("Failed to close Statement", e); + } + } + } + Object[] resultsArray = results.toArray(); + Arrays.sort(resultsArray); + return resultsArray; + } + /** * Returns the next LSN to be read from the database. This is the LSN of the last record that * was read from the database.
