This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev-addsampskip
in repository https://gitbox.apache.org/repos/asf/seatunnel.git

commit 4c70f76333ae070f43a77783b600536b906a9ae0
Author: liuli <[email protected]>
AuthorDate: Sat Jul 29 11:55:19 2023 +0800

    [Imporve] [CDC Base] Add a fast sampling method that supports character 
types
---
 .../splitter/AbstractJdbcSourceChunkSplitter.java  | 23 ++++++++-
 .../source/MySqlIncrementalSourceFactory.java      |  3 +-
 .../mysql/source/eumerator/MySqlChunkSplitter.java |  4 +-
 .../seatunnel/cdc/mysql/utils/MySqlUtils.java      | 53 +++++++++++++++++++++
 .../source/eumerator/SqlServerChunkSplitter.java   |  3 +-
 .../cdc/sqlserver/source/utils/SqlServerUtils.java | 54 ++++++++++++++++++++++
 6 files changed, 136 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index e956b11170..e99e7dab4b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -112,6 +112,19 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
         final int chunkSize = sourceConfig.getSplitSize();
         final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
         final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
+        final int sampleShardingThreshold = 
sourceConfig.getSampleShardingThreshold();
+
+        log.info(
+                "Splitting table {} into chunks, split column: {}, min: {}, 
max: {}, chunk size: {}, "
+                        + "distribution factor upper: {}, distribution factor 
lower: {}, sample sharding threshold: {}",
+                tableId,
+                splitColumnName,
+                min,
+                max,
+                chunkSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                sampleShardingThreshold);
 
         if (isEvenlySplitColumn(splitColumn)) {
             long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
@@ -130,7 +143,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
             } else {
                 int shardCount = (int) (approximateRowCnt / chunkSize);
                 int inverseSamplingRate = 
sourceConfig.getInverseSamplingRate();
-                if (sourceConfig.getSampleShardingThreshold() < shardCount) {
+                if (sampleShardingThreshold < shardCount) {
                     // It is necessary to ensure that the number of data rows 
sampled by the
                     // sampling rate is greater than the number of shards.
                     // Otherwise, if the sampling rate is too low, it may 
result in an insufficient
@@ -144,9 +157,17 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
                                 chunkSize);
                         inverseSamplingRate = chunkSize;
                     }
+                    log.info(
+                            "Use sampling sharding for table {}, the sampling 
rate is {}",
+                            tableId,
+                            inverseSamplingRate);
                     Object[] sample =
                             sampleDataFromColumn(
                                     jdbc, tableId, splitColumnName, 
inverseSamplingRate);
+                    log.info(
+                            "Sample data from table {} end, the sample size is 
{}",
+                            tableId,
+                            sample.length);
                     return efficientShardingThroughSampling(
                             tableId, sample, approximateRowCnt, shardCount);
                 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 396fd7bae9..a84eb79be3 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -68,7 +68,8 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
                         JdbcSourceOptions.CONNECTION_POOL_SIZE,
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
-                        JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
+                        JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
+                        JdbcSourceOptions.INVERSE_SAMPLING_RATE)
                 .optional(MySqlSourceOptions.STARTUP_MODE, 
MySqlSourceOptions.STOP_MODE)
                 .conditional(
                         MySqlSourceOptions.STARTUP_MODE,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index 0249889b23..c078f7cf28 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -28,10 +28,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
 import io.debezium.relational.TableId;
+import lombok.extern.slf4j.Slf4j;
 
 import java.sql.SQLException;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
+@Slf4j
 public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter {
 
     public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
@@ -55,7 +57,7 @@ public class MySqlChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
     public Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
             throws SQLException {
-        return MySqlUtils.sampleDataFromColumn(jdbc, tableId, columnName, 
inverseSamplingRate);
+        return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, 
inverseSamplingRate);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index c9223c81ff..fb00020644 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -36,11 +36,13 @@ import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.SchemaNameAdjuster;
+import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -52,6 +54,7 @@ import java.util.Optional;
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray;
 
 /** Utils to prepare MySQL SQL statement. */
+@Slf4j
 public class MySqlUtils {
 
     private MySqlUtils() {}
@@ -142,6 +145,56 @@ public class MySqlUtils {
                 });
     }
 
+    public static Object[] skipReadAndSortSampleData(
+            JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
+            throws SQLException {
+        final String sampleQuery =
+                String.format("SELECT %s FROM %s", quote(columnName), 
quote(tableId));
+
+        Statement stmt = null;
+        ResultSet rs = null;
+
+        List<Object> results = new ArrayList<>();
+        try {
+            stmt =
+                    jdbc.connection()
+                            .createStatement(
+                                    ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+
+            stmt.setFetchSize(Integer.MIN_VALUE);
+            rs = stmt.executeQuery(sampleQuery);
+
+            int count = 0;
+            while (rs.next()) {
+                count++;
+                if (count % 100000 == 0) {
+                    log.info("Processing row index: {}", count);
+                }
+                if (count % inverseSamplingRate == 0) {
+                    results.add(rs.getObject(1));
+                }
+            }
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    log.error("Failed to close ResultSet", e);
+                }
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException e) {
+                    log.error("Failed to close Statement", e);
+                }
+            }
+        }
+        Object[] resultsArray = results.toArray();
+        Arrays.sort(resultsArray);
+        return resultsArray;
+    }
+
     public static Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index 7efd53dc3f..1dc97020be 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -57,7 +57,8 @@ public class SqlServerChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
     public Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
             throws SQLException {
-        return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName, 
inverseSamplingRate);
+        return SqlServerUtils.skipReadAndSortSampleData(
+                jdbc, tableId, columnName, inverseSamplingRate);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index a127184984..d6e58825da 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -39,10 +39,13 @@ import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.SchemaNameAdjuster;
+import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -52,6 +55,7 @@ import java.util.Map;
 import java.util.Optional;
 
 /** The utils for SqlServer data source. */
+@Slf4j
 public class SqlServerUtils {
 
     public SqlServerUtils() {}
@@ -145,6 +149,56 @@ public class SqlServerUtils {
                 });
     }
 
+    public static Object[] skipReadAndSortSampleData(
+            JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
+            throws SQLException {
+        final String sampleQuery =
+                String.format("SELECT %s FROM %s", quote(columnName), 
quote(tableId));
+
+        Statement stmt = null;
+        ResultSet rs = null;
+
+        List<Object> results = new ArrayList<>();
+        try {
+            stmt =
+                    jdbc.connection()
+                            .createStatement(
+                                    ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+
+            stmt.setFetchSize(Integer.MIN_VALUE);
+            rs = stmt.executeQuery(sampleQuery);
+
+            int count = 0;
+            while (rs.next()) {
+                count++;
+                if (count % 100000 == 0) {
+                    log.info("Processing row index: {}", count);
+                }
+                if (count % inverseSamplingRate == 0) {
+                    results.add(rs.getObject(1));
+                }
+            }
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    log.error("Failed to close ResultSet", e);
+                }
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException e) {
+                    log.error("Failed to close Statement", e);
+                }
+            }
+        }
+        Object[] resultsArray = results.toArray();
+        Arrays.sort(resultsArray);
+        return resultsArray;
+    }
+
     /**
      * Returns the next LSN to be read from the database. This is the LSN of 
the last record that
      * was read from the database.

Reply via email to