This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 47f566005 [FLINK-35344][cdc-base] Move same code from multiple 
subclasses to JdbcSourceChunkSplitter (#3319)
47f566005 is described below

commit 47f5660055016460719bc8a9a72cee868e9aa6fe
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Aug 2 17:45:38 2024 +0800

    [FLINK-35344][cdc-base] Move same code from multiple subclasses to 
JdbcSourceChunkSplitter (#3319)
---
 .../assigner/splitter/JdbcSourceChunkSplitter.java | 387 ++++++++++++++++++---
 .../base/source/utils/JdbcChunkUtils.java          | 139 ++++++++
 .../base/experimental/MySqlChunkSplitter.java      | 323 +----------------
 .../base/experimental/utils/MySqlUtils.java        |  60 +---
 .../db2/source/dialect/Db2ChunkSplitter.java       | 309 +---------------
 .../cdc/connectors/db2/source/utils/Db2Utils.java  |  42 ---
 .../assigner/splitter/OracleChunkSplitter.java     | 310 ++---------------
 .../oracle/source/utils/OracleUtils.java           |  43 ---
 .../postgres/source/PostgresChunkSplitter.java     | 314 ++---------------
 .../postgres/source/utils/PostgresQueryUtils.java  |   9 +-
 .../source/dialect/SqlServerChunkSplitter.java     | 314 +----------------
 .../sqlserver/source/utils/SqlServerUtils.java     |  42 ---
 12 files changed, 564 insertions(+), 1728 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 4a24aed00..1d50164cc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -18,70 +18,119 @@
 package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
 
 import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
+import static java.math.BigDecimal.ROUND_CEILING;
+import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 @Experimental
-public interface JdbcSourceChunkSplitter extends ChunkSplitter {
+public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceChunkSplitter.class);
+    protected final JdbcSourceConfig sourceConfig;
+    protected final JdbcDataSourceDialect dialect;
+
+    public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
+        this.sourceConfig = sourceConfig;
+        this.dialect = dialect;
+    }
 
     /** Generates all snapshot splits (chunks) for the give table path. */
     @Override
-    Collection<SnapshotSplit> generateSplits(TableId tableId);
+    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
+        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
 
-    /**
-     * Query the maximum and minimum value of the column in the table. e.g. 
query string <code>
-     * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
-     *
-     * @param jdbc JDBC connection.
-     * @param tableId table identity.
-     * @param column column.
-     * @return maximum and minimum value.
-     */
-    Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) 
throws SQLException;
+            LOG.info("Start splitting table {} into chunks...", tableId);
+            long start = System.currentTimeMillis();
 
-    /**
-     * Query the minimum value of the column in the table, and the minimum 
value must greater than
-     * the excludedLowerBound value. e.g. prepare query string <code>
-     * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
-     *
-     * @param jdbc JDBC connection.
-     * @param tableId table identity.
-     * @param column column.
-     * @param excludedLowerBound the minimum value should be greater than this 
value.
-     * @return minimum value.
-     */
-    Object queryMin(JdbcConnection jdbc, TableId tableId, Column column, 
Object excludedLowerBound)
-            throws SQLException;
+            Table table =
+                    Objects.requireNonNull(dialect.queryTableSchema(jdbc, 
tableId)).getTable();
+            Column splitColumn = getSplitColumn(table, 
sourceConfig.getChunkKeyColumn());
+            final List<ChunkRange> chunks;
+            try {
+                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
+            } catch (SQLException e) {
+                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
+            }
+
+            // convert chunks into splits
+            List<SnapshotSplit> splits = new ArrayList<>();
+            RowType splitType = getSplitType(splitColumn);
+            for (int i = 0; i < chunks.size(); i++) {
+                ChunkRange chunk = chunks.get(i);
+                SnapshotSplit split =
+                        createSnapshotSplit(
+                                jdbc,
+                                tableId,
+                                i,
+                                splitType,
+                                chunk.getChunkStart(),
+                                chunk.getChunkEnd());
+                splits.add(split);
+            }
+
+            long end = System.currentTimeMillis();
+            LOG.info(
+                    "Split table {} into {} chunks, time cost: {}ms.",
+                    tableId,
+                    splits.size(),
+                    end - start);
+            return splits;
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(
+                    String.format("Generate Splits for table %s error", 
tableId), e);
+        }
+    }
 
     /**
      * Query the maximum value of the next chunk, and the next chunk must be 
greater than or equal
      * to <code>includedLowerBound</code> value [min_1, max_1), [min_2, 
max_2),... [min_n, null).
      * Each time this method is called it will return max1, max2...
      *
+     * <p>Each database has different grammar to get limit number of data, for 
example, `limit N` in
+     * mysql or postgres, `top(N)` in sqlserver , `FETCH FIRST %S ROWS ONLY` 
in DB2.
+     *
      * @param jdbc JDBC connection.
      * @param tableId table identity.
-     * @param column column.
+     * @param splitColumn column.
      * @param chunkSize chunk size.
      * @param includedLowerBound the previous chunk end value.
      * @return next chunk end value.
      */
-    Object queryNextChunkMax(
+    protected abstract Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
-            Column column,
+            Column splitColumn,
             int chunkSize,
             Object includedLowerBound)
             throws SQLException;
@@ -89,23 +138,16 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
     /**
      * Approximate total number of entries in the lookup table.
      *
+     * <p>Each database has different system table to lookup up approximate 
total number. For
+     * example, `pg_class` in postgres, `sys.dm_db_partition_stats` in 
sqlserver, `SYSCAT.TABLE` in
+     * db2.
+     *
      * @param jdbc JDBC connection.
      * @param tableId table identity.
      * @return approximate row count.
      */
-    Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws 
SQLException;
-
-    /**
-     * Build the scan query sql of the {@link SnapshotSplit}.
-     *
-     * @param tableId table identity.
-     * @param splitKeyType primary key type.
-     * @param isFirstSplit whether the first split.
-     * @param isLastSplit whether the last split.
-     * @return query sql.
-     */
-    String buildSplitScanQuery(
-            TableId tableId, RowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit);
+    protected abstract Long queryApproximateRowCnt(JdbcConnection jdbc, 
TableId tableId)
+            throws SQLException;
 
     /**
      * Checks whether split column is evenly distributed across its range.
@@ -113,7 +155,7 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
      * @param splitColumn split column.
      * @return true that means split column with type BIGINT, INT, DECIMAL.
      */
-    default boolean isEvenlySplitColumn(Column splitColumn) {
+    protected boolean isEvenlySplitColumn(Column splitColumn) {
         DataType flinkType = fromDbzColumn(splitColumn);
         LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
 
@@ -130,7 +172,94 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
      * @param splitColumn dbz split column.
      * @return flink data type
      */
-    DataType fromDbzColumn(Column splitColumn);
+    protected abstract DataType fromDbzColumn(Column splitColumn);
+
+    /** Returns the distribution factor of the given table. */
+    protected double calculateDistributionFactor(
+            TableId tableId, Object min, Object max, long approximateRowCnt) {
+
+        if (!min.getClass().equals(max.getClass())) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
+                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
+        }
+        if (approximateRowCnt == 0) {
+            return Double.MAX_VALUE;
+        }
+        BigDecimal difference = ObjectUtils.minus(max, min);
+        // factor = (max - min + 1) / rowCount
+        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
+        double distributionFactor =
+                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
+        LOG.info(
+                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
+                tableId,
+                distributionFactor,
+                min,
+                max,
+                approximateRowCnt);
+        return distributionFactor;
+    }
+
+    /**
+     * Get the column which is seen as chunk key.
+     *
+     * @param table table identity.
+     * @param chunkKeyColumn column name which is seen as chunk key, if 
chunkKeyColumn is null, use
+     *     primary key instead. @Column the column which is seen as chunk key.
+     */
+    protected Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
+        return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
+    }
+
+    /** ChunkEnd less than or equal to max. */
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) <= 0;
+    }
+
+    /** ChunkEnd greater than or equal to max. */
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+        return ObjectUtils.compare(chunkEnd, max) >= 0;
+    }
+
+    /**
+     * Query the maximum and minimum value of the column in the table. e.g. 
query string <code>
+     * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+     *
+     * @param jdbc JDBC connection.
+     * @param tableId table identity.
+     * @param splitColumn column.
+     * @return maximum and minimum value.
+     */
+    protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
Column splitColumn)
+            throws SQLException {
+        return JdbcChunkUtils.queryMinMax(
+                jdbc,
+                jdbc.quotedTableIdString(tableId),
+                jdbc.quotedColumnIdString(splitColumn.name()));
+    }
+
+    /**
+     * Query the minimum value of the column in the table, and the minimum 
value must greater than
+     * the excludedLowerBound value. e.g. prepare query string <code>
+     * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+     *
+     * @param jdbc JDBC connection.
+     * @param tableId table identity.
+     * @param splitColumn column.
+     * @param excludedLowerBound the minimum value should be greater than this 
value.
+     * @return minimum value.
+     */
+    protected Object queryMin(
+            JdbcConnection jdbc, TableId tableId, Column splitColumn, Object 
excludedLowerBound)
+            throws SQLException {
+        return JdbcChunkUtils.queryMin(
+                jdbc,
+                jdbc.quotedColumnIdString(splitColumn.name()),
+                jdbc.quotedTableIdString(tableId),
+                excludedLowerBound);
+    }
 
     /**
      * convert dbz column to Flink row type.
@@ -138,8 +267,178 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
      * @param splitColumn split column.
      * @return flink row type.
      */
-    default RowType getSplitType(Column splitColumn) {
+    private RowType getSplitType(Column splitColumn) {
         return (RowType)
                 ROW(FIELD(splitColumn.name(), 
fromDbzColumn(splitColumn))).getLogicalType();
     }
+
+    /**
+     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
+     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
+     * many queries and is not efficient.
+     */
+    private List<ChunkRange> splitTableIntoChunks(
+            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
+        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
+        final Object min = minMax[0];
+        final Object max = minMax[1];
+        if (min == null || max == null || min.equals(max)) {
+            // empty table, or only one row, return full table scan as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final int chunkSize = sourceConfig.getSplitSize();
+        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
+        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
+
+        if (isEvenlySplitColumn(splitColumn)) {
+            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
+            double distributionFactor =
+                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
+
+            boolean dataIsEvenlyDistributed =
+                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
+                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
+
+            if (dataIsEvenlyDistributed) {
+                // the minimum dynamic chunk size is at least 1
+                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
+                return splitEvenlySizedChunks(
+                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
+            } else {
+                return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, 
min, max, chunkSize);
+            }
+        } else {
+            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, 
max, chunkSize);
+        }
+    }
+
+    /**
+     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
+     * and tumble chunks in step size.
+     */
+    private List<ChunkRange> splitEvenlySizedChunks(
+            TableId tableId,
+            Object min,
+            Object max,
+            long approximateRowCnt,
+            int chunkSize,
+            int dynamicChunkSize) {
+        LOG.info(
+                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
+                tableId,
+                approximateRowCnt,
+                chunkSize,
+                dynamicChunkSize);
+        if (approximateRowCnt <= chunkSize) {
+            // there is no more than one chunk, return full table as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
+        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            chunkStart = chunkEnd;
+            try {
+                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
+            } catch (ArithmeticException e) {
+                // Stop chunk split to avoid dead loop when number overflows.
+                break;
+            }
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
+    private List<ChunkRange> splitUnevenlySizedChunks(
+            JdbcConnection jdbc,
+            TableId tableId,
+            Column splitColumn,
+            Object min,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        LOG.info(
+                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
+        int count = 0;
+        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
+            // we start from [null, min + chunk_size) and avoid [null, min)
+            splits.add(ChunkRange.of(chunkStart, chunkEnd));
+            // may sleep a while to avoid DDOS on PostgreSQL server
+            maySleep(count++, tableId);
+            chunkStart = chunkEnd;
+            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, 
chunkSize);
+        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
+        return splits;
+    }
+
+    private Object nextChunkEnd(
+            JdbcConnection jdbc,
+            Object previousChunkEnd,
+            TableId tableId,
+            Column splitColumn,
+            Object max,
+            int chunkSize)
+            throws SQLException {
+        // chunk end might be null when max values are removed
+        Object chunkEnd =
+                queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, 
previousChunkEnd);
+        if (Objects.equals(previousChunkEnd, chunkEnd)) {
+            // we don't allow equal chunk start and end,
+            // should query the next one larger than chunkEnd
+            chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
+        }
+        if (isChunkEndGeMax(chunkEnd, max)) {
+            return null;
+        } else {
+            return chunkEnd;
+        }
+    }
+
+    private SnapshotSplit createSnapshotSplit(
+            JdbcConnection jdbc,
+            TableId tableId,
+            int chunkId,
+            RowType splitKeyType,
+            Object chunkStart,
+            Object chunkEnd) {
+        // currently, we only support single split column
+        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
+        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
+        Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
+        schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
+        return new SnapshotSplit(
+                tableId,
+                splitId(tableId, chunkId),
+                splitKeyType,
+                splitStart,
+                splitEnd,
+                null,
+                schema);
+    }
+
+    private String splitId(TableId tableId, int chunkId) {
+        return tableId.toString() + ":" + chunkId;
+    }
+
+    private void maySleep(int count, TableId tableId) {
+        // every 10 queries to sleep 0.1s
+        if (count % 10 == 0) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // nothing to do
+            }
+            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
new file mode 100644
index 000000000..46b30310c
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.utils;
+
+import org.apache.flink.table.api.ValidationException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
+
+/** Utilities to split chunks of table. */
+public class JdbcChunkUtils {
+
+    /**
+     * Query the maximum and minimum value of the column in the table. e.g. 
query string <code>
+     * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+     *
+     * @param jdbc JDBC connection.
+     * @param quotedTableName table identity.
+     * @param quotedColumnName column name.
+     * @return maximum and minimum value.
+     */
+    public static Object[] queryMinMax(
+            JdbcConnection jdbc, String quotedTableName, String 
quotedColumnName)
+            throws SQLException {
+        final String minMaxQuery =
+                String.format(
+                        "SELECT MIN(%s), MAX(%s) FROM %s",
+                        quotedColumnName, quotedColumnName, quotedTableName);
+        return jdbc.queryAndMap(
+                minMaxQuery,
+                rs -> {
+                    if (!rs.next()) {
+                        // this should never happen
+                        throw new SQLException(
+                                String.format(
+                                        "No result returned after running 
query [%s]",
+                                        minMaxQuery));
+                    }
+                    return rowToArray(rs, 2);
+                });
+    }
+
+    /**
+     * Query the minimum value of the column in the table, and the minimum 
value must greater than
+     * the excludedLowerBound value. e.g. prepare query string <code>
+     * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+     *
+     * @param jdbc JDBC connection.
+     * @param quotedTableName table identity.
+     * @param quotedColumnName column name.
+     * @param excludedLowerBound the minimum value should be greater than this 
value.
+     * @return minimum value.
+     */
+    public static Object queryMin(
+            JdbcConnection jdbc,
+            String quotedTableName,
+            String quotedColumnName,
+            Object excludedLowerBound)
+            throws SQLException {
+        final String minQuery =
+                String.format(
+                        "SELECT MIN(%s) FROM %s WHERE %s > ?",
+                        quotedColumnName, quotedTableName, quotedColumnName);
+        return jdbc.prepareQueryAndMap(
+                minQuery,
+                ps -> ps.setObject(1, excludedLowerBound),
+                rs -> {
+                    if (!rs.next()) {
+                        // this should never happen
+                        throw new SQLException(
+                                String.format(
+                                        "No result returned after running 
query [%s]", minQuery));
+                    }
+                    return rs.getObject(1);
+                });
+    }
+
+    /**
+     * Get the column which is seen as chunk key.
+     *
+     * @param table table identity.
+     * @param chunkKeyColumn column name which is seen as chunk key, if 
chunkKeyColumn is null, use
+     *     primary key instead. @Column the column which is seen as chunk key.
+     */
+    public static Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
+        List<Column> primaryKeys = table.primaryKeyColumns();
+        if (primaryKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Incremental snapshot for tables requires primary 
key,"
+                                    + " but table %s doesn't have primary 
key.",
+                            table.id()));
+        }
+
+        if (chunkKeyColumn != null) {
+            Optional<Column> targetPkColumn =
+                    primaryKeys.stream()
+                            .filter(col -> chunkKeyColumn.equals(col.name()))
+                            .findFirst();
+            if (targetPkColumn.isPresent()) {
+                return targetPkColumn.get();
+            }
+            throw new ValidationException(
+                    String.format(
+                            "Chunk key column '%s' doesn't exist in the 
primary key [%s] of the table %s.",
+                            chunkKeyColumn,
+                            
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+                            table.id()));
+        }
+
+        // use first field in primary key as the split key
+        return primaryKeys.get(0);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java
index 29db714eb..497e8e657 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java
@@ -17,353 +17,48 @@
 
 package org.apache.flink.cdc.connectors.base.experimental;
 
+import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
 import org.apache.flink.cdc.connectors.base.experimental.utils.MySqlTypeUtils;
 import org.apache.flink.cdc.connectors.base.experimental.utils.MySqlUtils;
-import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
 import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
-import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.math.BigDecimal;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static java.math.BigDecimal.ROUND_CEILING;
-import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
-public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlChunkSplitter.class);
-
-    private final JdbcSourceConfig sourceConfig;
-    private final JdbcDataSourceDialect dialect;
+@Internal
+public class MySqlChunkSplitter extends JdbcSourceChunkSplitter {
 
     public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
-        this.sourceConfig = sourceConfig;
-        this.dialect = dialect;
-    }
-
-    @Override
-    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
-        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
-            LOG.info("Start splitting table {} into chunks...", tableId);
-            long start = System.currentTimeMillis();
-
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
-            Column splitColumn = getSplitColumn(table);
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
-            }
-
-            // convert chunks into splits
-            List<SnapshotSplit> splits = new ArrayList<>();
-            RowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
-            }
-
-            long end = System.currentTimeMillis();
-            LOG.info(
-                    "Split table {} into {} chunks, time cost: {}ms.",
-                    tableId,
-                    splits.size(),
-                    end - start);
-            return splits;
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(
-                    String.format("Generate Splits for table %s error", 
tableId), e);
-        }
-    }
-
-    @Override
-    public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column 
column)
-            throws SQLException {
-        return MySqlUtils.queryMinMax(jdbc, tableId, column.name());
-    }
-
-    @Override
-    public Object queryMin(
-            JdbcConnection jdbc, TableId tableId, Column column, Object 
excludedLowerBound)
-            throws SQLException {
-        return MySqlUtils.queryMin(jdbc, tableId, column.name(), 
excludedLowerBound);
+        super(sourceConfig, dialect);
     }
 
     @Override
     public Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
-            Column column,
+            Column splitColumn,
             int chunkSize,
             Object includedLowerBound)
             throws SQLException {
         return MySqlUtils.queryNextChunkMax(
-                jdbc, tableId, column.name(), chunkSize, includedLowerBound);
+                jdbc, tableId, splitColumn.name(), chunkSize, 
includedLowerBound);
     }
 
     @Override
-    public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) 
throws SQLException {
+    protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
+            throws SQLException {
         return MySqlUtils.queryApproximateRowCnt(jdbc, tableId);
     }
 
     @Override
-    public String buildSplitScanQuery(
-            TableId tableId, RowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
-        return MySqlUtils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
-    }
-
-    @Override
-    public DataType fromDbzColumn(Column splitColumn) {
+    protected DataType fromDbzColumn(Column splitColumn) {
         return MySqlTypeUtils.fromDbzColumn(splitColumn);
     }
-
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
-     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
-     * many queries and is not efficient.
-     */
-    private List<ChunkRange> splitTableIntoChunks(
-            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
-        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
-        final Object min = minMax[0];
-        final Object max = minMax[1];
-        if (min == null || max == null || min.equals(max)) {
-            // empty table, or only one row, return full table scan as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final int chunkSize = sourceConfig.getSplitSize();
-        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
-        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
-
-        if (isEvenlySplitColumn(splitColumn)) {
-            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
-            double distributionFactor =
-                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
-                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
-            } else {
-                return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, 
min, max, chunkSize);
-            }
-        } else {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, 
max, chunkSize);
-        }
-    }
-
-    /**
-     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
-     * and tumble chunks in step size.
-     */
-    private List<ChunkRange> splitEvenlySizedChunks(
-            TableId tableId,
-            Object min,
-            Object max,
-            long approximateRowCnt,
-            int chunkSize,
-            int dynamicChunkSize) {
-        LOG.info(
-                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
-                tableId,
-                approximateRowCnt,
-                chunkSize,
-                dynamicChunkSize);
-        if (approximateRowCnt <= chunkSize) {
-            // there is no more than one chunk, return full table as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
-        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            chunkStart = chunkEnd;
-            try {
-                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
-            } catch (ArithmeticException e) {
-                // Stop chunk split to avoid dead loop when number overflows.
-                break;
-            }
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
-    private List<ChunkRange> splitUnevenlySizedChunks(
-            JdbcConnection jdbc,
-            TableId tableId,
-            Column splitColumn,
-            Object min,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        LOG.info(
-                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
-        int count = 0;
-        while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
-            // we start from [null, min + chunk_size) and avoid [null, min)
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            // may sleep a while to avoid DDOS on MySQL server
-            maySleep(count++, tableId);
-            chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, 
chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    private Object nextChunkEnd(
-            JdbcConnection jdbc,
-            Object previousChunkEnd,
-            TableId tableId,
-            Column splitColumn,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        // chunk end might be null when max values are removed
-        Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, 
previousChunkEnd);
-        if (Objects.equals(previousChunkEnd, chunkEnd)) {
-            // we don't allow equal chunk start and end,
-            // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
-        }
-        if (ObjectUtils.compare(chunkEnd, max) >= 0) {
-            return null;
-        } else {
-            return chunkEnd;
-        }
-    }
-
-    private SnapshotSplit createSnapshotSplit(
-            JdbcConnection jdbc,
-            TableId tableId,
-            int chunkId,
-            RowType splitKeyType,
-            Object chunkStart,
-            Object chunkEnd) {
-        // currently, we only support single split column
-        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
-        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
-        Map<TableId, TableChange> schema = new HashMap<>();
-        schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
-        return new SnapshotSplit(
-                tableId,
-                splitId(tableId, chunkId),
-                splitKeyType,
-                splitStart,
-                splitEnd,
-                null,
-                schema);
-    }
-
-    // 
------------------------------------------------------------------------------------------
-    /** Returns the distribution factor of the given table. */
-    private double calculateDistributionFactor(
-            TableId tableId, Object min, Object max, long approximateRowCnt) {
-
-        if (!min.getClass().equals(max.getClass())) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
-                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
-        }
-        if (approximateRowCnt == 0) {
-            return Double.MAX_VALUE;
-        }
-        BigDecimal difference = ObjectUtils.minus(max, min);
-        // factor = (max - min + 1) / rowCount
-        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
-        double distributionFactor =
-                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
-        LOG.info(
-                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
-                tableId,
-                distributionFactor,
-                min,
-                max,
-                approximateRowCnt);
-        return distributionFactor;
-    }
-
-    private static String splitId(TableId tableId, int chunkId) {
-        return tableId.toString() + ":" + chunkId;
-    }
-
-    private static void maySleep(int count, TableId tableId) {
-        // every 10 queries to sleep 100ms
-        if (count % 10 == 0) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // nothing to do
-            }
-            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
-        }
-    }
-
-    public static Column getSplitColumn(Table table) {
-        List<Column> primaryKeys = table.primaryKeyColumns();
-        if (primaryKeys.isEmpty()) {
-            throw new ValidationException(
-                    String.format(
-                            "Incremental snapshot for tables requires primary 
key,"
-                                    + " but table %s doesn't have primary 
key.",
-                            table.id()));
-        }
-
-        // use first field in primary key as the split key
-        return primaryKeys.get(0);
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java
index 0d6ef1f88..f85a283a7 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java
@@ -45,7 +45,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 
@@ -54,26 +53,6 @@ public class MySqlUtils {
 
     private MySqlUtils() {}
 
-    public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
String columnName)
-            throws SQLException {
-        final String minMaxQuery =
-                String.format(
-                        "SELECT MIN(%s), MAX(%s) FROM %s",
-                        quote(columnName), quote(columnName), quote(tableId));
-        return jdbc.queryAndMap(
-                minMaxQuery,
-                rs -> {
-                    if (!rs.next()) {
-                        // this should never happen
-                        throw new SQLException(
-                                String.format(
-                                        "No result returned after running 
query [%s]",
-                                        minMaxQuery));
-                    }
-                    return rowToArray(rs, 2);
-                });
-    }
-
     public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId 
tableId)
             throws SQLException {
         // The statement used to get approximate row count which is less
@@ -94,27 +73,6 @@ public class MySqlUtils {
                 });
     }
 
-    public static Object queryMin(
-            JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
-            throws SQLException {
-        final String minQuery =
-                String.format(
-                        "SELECT MIN(%s) FROM %s WHERE %s > ?",
-                        quote(columnName), quote(tableId), quote(columnName));
-        return jdbc.prepareQueryAndMap(
-                minQuery,
-                ps -> ps.setObject(1, excludedLowerBound),
-                rs -> {
-                    if (!rs.next()) {
-                        // this should never happen
-                        throw new SQLException(
-                                String.format(
-                                        "No result returned after running 
query [%s]", minQuery));
-                    }
-                    return rs.getObject(1);
-                });
-    }
-
     public static Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
@@ -122,7 +80,7 @@ public class MySqlUtils {
             int chunkSize,
             Object includedLowerBound)
             throws SQLException {
-        String quotedColumn = quote(splitColumnName);
+        String quotedColumn = jdbc.quotedColumnIdString(splitColumnName);
         String query =
                 String.format(
                         "SELECT MAX(%s) FROM ("
@@ -130,7 +88,7 @@ public class MySqlUtils {
                                 + ") AS T",
                         quotedColumn,
                         quotedColumn,
-                        quote(tableId),
+                        jdbc.quotedTableIdString(tableId),
                         quotedColumn,
                         quotedColumn,
                         chunkSize);
@@ -313,20 +271,6 @@ public class MySqlUtils {
                         .getLogicalType();
     }
 
-    public static Column getSplitColumn(Table table) {
-        List<Column> primaryKeys = table.primaryKeyColumns();
-        if (primaryKeys.isEmpty()) {
-            throw new ValidationException(
-                    String.format(
-                            "Incremental snapshot for tables requires primary 
key,"
-                                    + " but table %s doesn't have primary 
key.",
-                            table.id()));
-        }
-
-        // use first field in primary key as the split key
-        return primaryKeys.get(0);
-    }
-
     public static String quote(String dbOrTableName) {
         return "`" + dbOrTableName + "`";
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java
index 6381cd74a..6e77de798 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java
@@ -17,139 +17,37 @@
 
 package org.apache.flink.cdc.connectors.db2.source.dialect;
 
+import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
 import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
-import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import org.apache.flink.cdc.connectors.db2.source.utils.Db2TypeUtils;
 import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.math.BigDecimal;
-import java.math.RoundingMode;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 
 /**
  * The splitter to split the table into chunks using primary-key (by default) 
or a given split key.
  */
-public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(Db2ChunkSplitter.class);
-
-    private final JdbcSourceConfig sourceConfig;
-    private final JdbcDataSourceDialect dialect;
+@Internal
+public class Db2ChunkSplitter extends JdbcSourceChunkSplitter {
 
     public Db2ChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
-        this.sourceConfig = sourceConfig;
-        this.dialect = dialect;
-    }
-
-    private static String splitId(TableId tableId, int chunkId) {
-        return tableId.toString() + ":" + chunkId;
-    }
-
-    private static void maySleep(int count, TableId tableId) {
-        // every 10 queries to sleep 0.1s
-        if (count % 10 == 0) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // nothing to do
-            }
-            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
-        }
-    }
-
-    @Override
-    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
-        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
-            LOG.info("Start splitting table {} into chunks...", tableId);
-            long start = System.currentTimeMillis();
-
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
-            Column splitColumn = Db2Utils.getSplitColumn(table, 
sourceConfig.getChunkKeyColumn());
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
-            }
-
-            // convert chunks into splits
-            List<SnapshotSplit> splits = new ArrayList<>();
-            RowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
-            }
-
-            long end = System.currentTimeMillis();
-            LOG.info(
-                    "Split table {} into {} chunks, time cost: {}ms.",
-                    tableId,
-                    splits.size(),
-                    end - start);
-            return splits;
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(
-                    String.format("Generate Splits for table %s error", 
tableId), e);
-        }
+        super(sourceConfig, dialect);
     }
 
     @Override
-    public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column 
column)
-            throws SQLException {
-        return Db2Utils.queryMinMax(jdbc, tableId, column.name());
-    }
-
-    @Override
-    public Object queryMin(
-            JdbcConnection jdbc, TableId tableId, Column column, Object 
excludedLowerBound)
-            throws SQLException {
-        return Db2Utils.queryMin(jdbc, tableId, column.name(), 
excludedLowerBound);
-    }
-
-    @Override
-    public DataType fromDbzColumn(Column splitColumn) {
+    protected DataType fromDbzColumn(Column splitColumn) {
         return Db2TypeUtils.fromDbzColumn(splitColumn);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
     @Override
-    public Object queryNextChunkMax(
+    protected Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
             Column column,
@@ -161,199 +59,8 @@ public class Db2ChunkSplitter implements 
JdbcSourceChunkSplitter {
     }
 
     @Override
-    public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) 
throws SQLException {
-        return Db2Utils.queryApproximateRowCnt(jdbc, tableId);
-    }
-
-    @Override
-    public String buildSplitScanQuery(
-            TableId tableId, RowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
-        return Db2Utils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
-    }
-
-    /**
-     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
-     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
-     * many queries and is not efficient.
-     */
-    private List<ChunkRange> splitTableIntoChunks(
-            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
-        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
-        final Object min = minMax[0];
-        final Object max = minMax[1];
-        if (min == null || max == null || min.equals(max)) {
-            // empty table, or only one row, return full table scan as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final int chunkSize = sourceConfig.getSplitSize();
-        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
-        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
-
-        if (isEvenlySplitColumn(splitColumn)) {
-            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
-            double distributionFactor =
-                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
-                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
-            } else {
-                return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, 
min, max, chunkSize);
-            }
-        } else {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, 
max, chunkSize);
-        }
-    }
-
-    /**
-     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
-     * and tumble chunks in step size.
-     */
-    private List<ChunkRange> splitEvenlySizedChunks(
-            TableId tableId,
-            Object min,
-            Object max,
-            long approximateRowCnt,
-            int chunkSize,
-            int dynamicChunkSize) {
-        LOG.info(
-                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
-                tableId,
-                approximateRowCnt,
-                chunkSize,
-                dynamicChunkSize);
-        if (approximateRowCnt <= chunkSize) {
-            // there is no more than one chunk, return full table as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
-        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            chunkStart = chunkEnd;
-            try {
-                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
-            } catch (ArithmeticException e) {
-                // Stop chunk split to avoid dead loop when number overflows.
-                break;
-            }
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    // 
------------------------------------------------------------------------------------------
-
-    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
-    private List<ChunkRange> splitUnevenlySizedChunks(
-            JdbcConnection jdbc,
-            TableId tableId,
-            Column splitColumn,
-            Object min,
-            Object max,
-            int chunkSize)
+    protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
             throws SQLException {
-        LOG.info(
-                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
-        int count = 0;
-        while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
-            // we start from [null, min + chunk_size) and avoid [null, min)
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            // may sleep awhile to avoid DDOS on Db2 server
-            maySleep(count++, tableId);
-            chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, 
chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    private Object nextChunkEnd(
-            JdbcConnection jdbc,
-            Object previousChunkEnd,
-            TableId tableId,
-            Column splitColumn,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        // chunk end might be null when max values are removed
-        Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, 
previousChunkEnd);
-        if (Objects.equals(previousChunkEnd, chunkEnd)) {
-            // we don't allow equal chunk start and end,
-            // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
-        }
-        if (ObjectUtils.compare(chunkEnd, max) >= 0) {
-            return null;
-        } else {
-            return chunkEnd;
-        }
-    }
-
-    private SnapshotSplit createSnapshotSplit(
-            JdbcConnection jdbc,
-            TableId tableId,
-            int chunkId,
-            RowType splitKeyType,
-            Object chunkStart,
-            Object chunkEnd) {
-        // currently, we only support single split column
-        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
-        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
-        Map<TableId, TableChange> schema = new HashMap<>();
-        schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
-        return new SnapshotSplit(
-                tableId,
-                splitId(tableId, chunkId),
-                splitKeyType,
-                splitStart,
-                splitEnd,
-                null,
-                schema);
-    }
-
-    /** Returns the distribution factor of the given table. */
-    private double calculateDistributionFactor(
-            TableId tableId, Object min, Object max, long approximateRowCnt) {
-
-        if (!min.getClass().equals(max.getClass())) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
-                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
-        }
-        if (approximateRowCnt == 0) {
-            return Double.MAX_VALUE;
-        }
-        BigDecimal difference = ObjectUtils.minus(max, min);
-        // factor = (max - min + 1) / rowCount
-        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
-        double distributionFactor =
-                subRowCnt
-                        .divide(new BigDecimal(approximateRowCnt), 4, 
RoundingMode.CEILING)
-                        .doubleValue();
-        LOG.info(
-                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
-                tableId,
-                distributionFactor,
-                min,
-                max,
-                approximateRowCnt);
-        return distributionFactor;
+        return Db2Utils.queryApproximateRowCnt(jdbc, tableId);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java
index 06c4e741f..2d08d7bff 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java
@@ -49,7 +49,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 
@@ -60,26 +59,6 @@ public class Db2Utils {
 
     public Db2Utils() {}
 
-    public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
String columnName)
-            throws SQLException {
-        final String minMaxQuery =
-                String.format(
-                        "SELECT MIN(%s), MAX(%s) FROM %s",
-                        quote(columnName), quote(columnName), 
quoteSchemaAndTable(tableId));
-        return jdbc.queryAndMap(
-                minMaxQuery,
-                rs -> {
-                    if (!rs.next()) {
-                        // this should never happen
-                        throw new SQLException(
-                                String.format(
-                                        "No result returned after running 
query [%s]",
-                                        minMaxQuery));
-                    }
-                    return rowToArray(rs, 2);
-                });
-    }
-
     public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId 
tableId)
             throws SQLException {
         // The statement used to get approximate row count which is less
@@ -103,27 +82,6 @@ public class Db2Utils {
                 });
     }
 
-    public static Object queryMin(
-            JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
-            throws SQLException {
-        final String minQuery =
-                String.format(
-                        "SELECT MIN(%s) FROM %s WHERE %s > ?",
-                        columnName, quote(tableId), columnName);
-        return jdbc.prepareQueryAndMap(
-                minQuery,
-                ps -> ps.setObject(1, excludedLowerBound),
-                rs -> {
-                    if (!rs.next()) {
-                        // this should never happen
-                        throw new SQLException(
-                                String.format(
-                                        "No result returned after running 
query [%s]", minQuery));
-                    }
-                    return rs.getObject(1);
-                });
-    }
-
     /**
      * Returns the next LSN to be read from the database. This is the LSN of 
the last record that
      * was read from the database.
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
index d68f1cd99..adf4c98a9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
@@ -17,253 +17,92 @@
 
 package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter;
 
+import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
 import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
 import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import org.apache.flink.cdc.connectors.oracle.source.utils.OracleTypeUtils;
 import org.apache.flink.cdc.connectors.oracle.source.utils.OracleUtils;
 import org.apache.flink.cdc.connectors.oracle.util.ChunkUtils;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
 import oracle.sql.ROWID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import javax.annotation.Nullable;
 
-import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
+import java.sql.SQLException;
 
 /**
  * The {@code ChunkSplitter} used to split Oracle table into a set of chunks 
for JDBC data source.
  */
-public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(OracleChunkSplitter.class);
-
-    private final JdbcSourceConfig sourceConfig;
-    private final JdbcDataSourceDialect dialect;
+@Internal
+public class OracleChunkSplitter extends JdbcSourceChunkSplitter {
 
     public OracleChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
-        this.sourceConfig = sourceConfig;
-        this.dialect = dialect;
-    }
-
-    @Override
-    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
-        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
-            LOG.info("Start splitting table {} into chunks...", tableId);
-            long start = System.currentTimeMillis();
-
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
-            Column splitColumn =
-                    ChunkUtils.getChunkKeyColumn(table, 
sourceConfig.getChunkKeyColumn());
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
-            }
-
-            // convert chunks into splits
-            List<SnapshotSplit> splits = new ArrayList<>();
-            RowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
-            }
-
-            long end = System.currentTimeMillis();
-            LOG.info(
-                    "Split table {} into {} chunks, time cost: {}ms.",
-                    tableId,
-                    splits.size(),
-                    end - start);
-            return splits;
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(
-                    String.format("Generate Splits for table %s error", 
tableId), e);
-        }
+        super(sourceConfig, dialect);
     }
 
     @Override
-    public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column 
column)
+    public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column 
splitColumn)
             throws SQLException {
-        return OracleUtils.queryMinMax(jdbc, tableId, column.name());
+        // oracle query only use schema and table.
+        String quoteSchemaAndTable = OracleUtils.quoteSchemaAndTable(tableId);
+        return JdbcChunkUtils.queryMinMax(
+                jdbc, quoteSchemaAndTable, 
jdbc.quotedColumnIdString(splitColumn.name()));
     }
 
     @Override
     public Object queryMin(
             JdbcConnection jdbc, TableId tableId, Column column, Object 
excludedLowerBound)
             throws SQLException {
-        return OracleUtils.queryMin(jdbc, tableId, column.name(), 
excludedLowerBound);
+        // oracle query only use schema and table.
+        String quoteSchemaAndTable = OracleUtils.quoteSchemaAndTable(tableId);
+        return JdbcChunkUtils.queryMin(
+                jdbc, quoteSchemaAndTable, jdbc.quotedTableIdString(tableId), 
excludedLowerBound);
     }
 
     @Override
     public Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
-            Column column,
+            Column splitColumn,
             int chunkSize,
             Object includedLowerBound)
             throws SQLException {
         return OracleUtils.queryNextChunkMax(
-                jdbc, tableId, column.name(), chunkSize, includedLowerBound);
+                jdbc, tableId, splitColumn.name(), chunkSize, 
includedLowerBound);
     }
 
     @Override
-    public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) 
throws SQLException {
+    protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
+            throws SQLException {
         return OracleUtils.queryApproximateRowCnt(jdbc, tableId);
     }
 
-    @Override
-    public String buildSplitScanQuery(
-            TableId tableId, RowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
-        return OracleUtils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
-    }
-
     @Override
     public DataType fromDbzColumn(Column splitColumn) {
         return OracleTypeUtils.fromDbzColumn(splitColumn);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
-     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
-     * many queries and is not efficient.
-     */
-    private List<ChunkRange> splitTableIntoChunks(
-            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
-        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
-        final Object min = minMax[0];
-        final Object max = minMax[1];
-        if (min == null || max == null || min.equals(max)) {
-            // empty table, or only one row, return full table scan as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final int chunkSize = sourceConfig.getSplitSize();
-        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
-        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
-
+    @Override
+    protected boolean isEvenlySplitColumn(Column splitColumn) {
         // use ROWID get splitUnevenlySizedChunks by default
         if (splitColumn.name().equals(ROWID.class.getSimpleName())) {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, 
max, chunkSize);
-        }
-
-        if (isEvenlySplitColumn(splitColumn)) {
-            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
-            double distributionFactor =
-                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
-                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tableId, min, max, approximateRowCnt, 
dynamicChunkSize);
-            } else {
-                return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, 
min, max, chunkSize);
-            }
-        } else {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, 
max, chunkSize);
-        }
-    }
-
-    /**
-     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
-     * and tumble chunks in step size.
-     */
-    private List<ChunkRange> splitEvenlySizedChunks(
-            TableId tableId, Object min, Object max, long approximateRowCnt, 
int chunkSize) {
-        LOG.info(
-                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}",
-                tableId,
-                approximateRowCnt,
-                chunkSize);
-        if (approximateRowCnt <= chunkSize) {
-            // there is no more than one chunk, return full table as a chunk
-            return Collections.singletonList(ChunkRange.all());
+            return false;
         }
 
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = ObjectUtils.plus(min, chunkSize);
-        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            chunkStart = chunkEnd;
-            chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
-    private List<ChunkRange> splitUnevenlySizedChunks(
-            JdbcConnection jdbc,
-            TableId tableId,
-            Column splitColumn,
-            Object min,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        LOG.info(
-                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
-        int count = 0;
-
-        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
-            // we start from [null, min + chunk_size) and avoid [null, min)
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            // may sleep a while to avoid DDOS on MySQL server
-            maySleep(count++, tableId);
-            chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, 
chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
+        return super.isEvenlySplitColumn(splitColumn);
     }
 
     /** ChunkEnd less than or equal to max. */
-    private boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+    @Override
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
         boolean chunkEndMaxCompare;
         if (chunkEnd instanceof ROWID && max instanceof ROWID) {
             chunkEndMaxCompare =
@@ -276,7 +115,8 @@ public class OracleChunkSplitter implements 
JdbcSourceChunkSplitter {
     }
 
     /** ChunkEnd greater than or equal to max. */
-    private boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+    @Override
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
         boolean chunkEndMaxCompare;
         if (chunkEnd instanceof ROWID && max instanceof ROWID) {
             chunkEndMaxCompare =
@@ -288,95 +128,9 @@ public class OracleChunkSplitter implements 
JdbcSourceChunkSplitter {
         return chunkEndMaxCompare;
     }
 
-    private Object nextChunkEnd(
-            JdbcConnection jdbc,
-            Object previousChunkEnd,
-            TableId tableId,
-            Column splitColumn,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        // chunk end might be null when max values are removed
-        Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, 
previousChunkEnd);
-        if (Objects.equals(previousChunkEnd, chunkEnd)) {
-            // we don't allow equal chunk start and end,
-            // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
-        }
-        if (isChunkEndGeMax(chunkEnd, max)) {
-            return null;
-        } else {
-            return chunkEnd;
-        }
-    }
-
-    private SnapshotSplit createSnapshotSplit(
-            JdbcConnection jdbc,
-            TableId tableId,
-            int chunkId,
-            RowType splitKeyType,
-            Object chunkStart,
-            Object chunkEnd) {
-        // currently, we only support single split column
-        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
-        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
-        Map<TableId, TableChange> schema = new HashMap<>();
-        schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
-        return new SnapshotSplit(
-                tableId,
-                splitId(tableId, chunkId),
-                splitKeyType,
-                splitStart,
-                splitEnd,
-                null,
-                schema);
-    }
-
-    // 
------------------------------------------------------------------------------------------
-    /** Returns the distribution factor of the given table. */
-    private double calculateDistributionFactor(
-            TableId tableId, Object min, Object max, long approximateRowCnt) {
-
-        if (!min.getClass().equals(max.getClass())) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
-                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
-        }
-        if (approximateRowCnt == 0) {
-            return Double.MAX_VALUE;
-        }
-        BigDecimal difference = ObjectUtils.minus(max, min);
-        // factor = (max - min + 1) / rowCount
-        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
-        double distributionFactor =
-                subRowCnt
-                        .divide(new BigDecimal(approximateRowCnt), 4, 
RoundingMode.CEILING)
-                        .doubleValue();
-        LOG.info(
-                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
-                tableId,
-                distributionFactor,
-                min,
-                max,
-                approximateRowCnt);
-        return distributionFactor;
-    }
-
-    private static String splitId(TableId tableId, int chunkId) {
-        return tableId.toString() + ":" + chunkId;
-    }
-
-    private static void maySleep(int count, TableId tableId) {
-        // every 10 queries to sleep 0.1s
-        if (count % 10 == 0) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // nothing to do
-            }
-            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
-        }
+    @Override
+    protected Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
+        // Use the ROWID column as the chunk key column by default for oracle 
cdc connector
+        return ChunkUtils.getChunkKeyColumn(table, chunkKeyColumn);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java
index b762a974b..4f3508744 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java
@@ -42,33 +42,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
-
 /** Utils to prepare Oracle SQL statement. */
 public class OracleUtils {
 
     private OracleUtils() {}
 
-    public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
String columnName)
-            throws SQLException {
-        final String minMaxQuery =
-                String.format(
-                        "SELECT MIN(%s), MAX(%s) FROM %s",
-                        quote(columnName), quote(columnName), 
quoteSchemaAndTable(tableId));
-        return jdbc.queryAndMap(
-                minMaxQuery,
-                rs -> {
-                    if (!rs.next()) {
-                        // this should never happen
-                        throw new SQLException(
-                                String.format(
-                                        "No result returned after running 
query [%s]",
-                                        minMaxQuery));
-                    }
-                    return rowToArray(rs, 2);
-                });
-    }
-
     public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId 
tableId)
             throws SQLException {
         final String analyzeTable =
@@ -92,27 +70,6 @@ public class OracleUtils {
                         });
     }
 
-    public static Object queryMin(
-            JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
-            throws SQLException {
-        final String minQuery =
-                String.format(
-                        "SELECT MIN(%s) FROM %s WHERE %s > ?",
-                        quote(columnName), quoteSchemaAndTable(tableId), 
quote(columnName));
-        return jdbc.prepareQueryAndMap(
-                minQuery,
-                ps -> ps.setObject(1, excludedLowerBound),
-                rs -> {
-                    if (!rs.next()) {
-                        // this should never happen
-                        throw new SQLException(
-                                String.format(
-                                        "No result returned after running 
query [%s]", minQuery));
-                    }
-                    return rs.getObject(1);
-                });
-    }
-
     public static Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java
index 96a4cc18d..9639b28f8 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java
@@ -17,138 +17,54 @@
 
 package org.apache.flink.cdc.connectors.postgres.source;
 
+import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
-import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
 import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
-import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
-import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
 import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
 import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.math.BigDecimal;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static java.math.BigDecimal.ROUND_CEILING;
-import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 
 /**
  * The splitter to split the table into chunks using primary-key (by default) 
or a given split key.
  */
-public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresChunkSplitter.class);
-
-    private final JdbcSourceConfig sourceConfig;
-    private final PostgresDialect dialect;
+@Internal
+public class PostgresChunkSplitter extends JdbcSourceChunkSplitter {
 
     public PostgresChunkSplitter(JdbcSourceConfig sourceConfig, 
PostgresDialect postgresDialect) {
-        this.sourceConfig = sourceConfig;
-        this.dialect = postgresDialect;
-    }
-
-    private static String splitId(TableId tableId, int chunkId) {
-        return tableId.toString() + ":" + chunkId;
-    }
-
-    private static void maySleep(int count, TableId tableId) {
-        // every 10 queries to sleep 100ms
-        if (count % 10 == 0) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // nothing to do
-            }
-            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
-        }
+        super(sourceConfig, postgresDialect);
     }
 
     @Override
-    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
-        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
-            LOG.info("Start splitting table {} into chunks...", tableId);
-            long start = System.currentTimeMillis();
-
-            Table table =
-                    Objects.requireNonNull(dialect.queryTableSchema(jdbc, 
tableId)).getTable();
-            Column splitColumn = ChunkUtils.getSplitColumn(table, 
sourceConfig.getChunkKeyColumn());
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
-            }
-
-            // convert chunks into splits
-            List<SnapshotSplit> splits = new ArrayList<>();
-            RowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
-            }
-
-            long end = System.currentTimeMillis();
-            LOG.info(
-                    "Split table {} into {} chunks, time cost: {}ms.",
-                    tableId,
-                    splits.size(),
-                    end - start);
-            return splits;
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(
-                    String.format("Generate Splits for table %s error", 
tableId), e);
-        }
+    public Object queryNextChunkMax(
+            JdbcConnection jdbc,
+            TableId tableId,
+            Column splitColumn,
+            int chunkSize,
+            Object includedLowerBound)
+            throws SQLException {
+        return PostgresQueryUtils.queryNextChunkMax(
+                jdbc, tableId, splitColumn, chunkSize, includedLowerBound);
     }
 
+    /** Postgres chunk split overrides queryMin method to query based on uuid. 
*/
     @Override
     public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column 
splitColumn)
             throws SQLException {
         return PostgresQueryUtils.queryMinMax(jdbc, tableId, splitColumn);
     }
 
+    /** Postgres chunk split overrides queryMin method to query based on uuid. 
*/
     @Override
     public Object queryMin(
-            JdbcConnection jdbc, TableId tableId, Column column, Object 
excludedLowerBound)
-            throws SQLException {
-        return PostgresQueryUtils.queryMin(jdbc, tableId, column, 
excludedLowerBound);
-    }
-
-    @Override
-    public Object queryNextChunkMax(
-            JdbcConnection jdbc,
-            TableId tableId,
-            Column column,
-            int chunkSize,
-            Object includedLowerBound)
+            JdbcConnection jdbc, TableId tableId, Column splitColumn, Object 
excludedLowerBound)
             throws SQLException {
-        return PostgresQueryUtils.queryNextChunkMax(
-                jdbc, tableId, column, chunkSize, includedLowerBound);
+        return PostgresQueryUtils.queryMin(jdbc, tableId, splitColumn, 
excludedLowerBound);
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -156,203 +72,13 @@ public class PostgresChunkSplitter implements 
JdbcSourceChunkSplitter {
     // 
--------------------------------------------------------------------------------------------
 
     @Override
-    public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) 
throws SQLException {
+    protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
+            throws SQLException {
         return PostgresQueryUtils.queryApproximateRowCnt(jdbc, tableId);
     }
 
     @Override
-    public String buildSplitScanQuery(
-            TableId tableId, RowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
-        return PostgresQueryUtils.buildSplitScanQuery(
-                tableId, splitKeyType, isFirstSplit, isLastSplit);
-    }
-
-    @Override
-    public DataType fromDbzColumn(Column splitColumn) {
+    protected DataType fromDbzColumn(Column splitColumn) {
         return PostgresTypeUtils.fromDbzColumn(splitColumn);
     }
-
-    /**
-     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
-     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
-     * many queries and is not efficient.
-     */
-    private List<ChunkRange> splitTableIntoChunks(
-            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
-        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
-        final Object min = minMax[0];
-        final Object max = minMax[1];
-        if (min == null || max == null || min.equals(max)) {
-            // empty table, or only one row, return full table scan as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final int chunkSize = sourceConfig.getSplitSize();
-        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
-        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
-
-        if (isEvenlySplitColumn(splitColumn)) {
-            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
-            double distributionFactor =
-                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
-                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
-            } else {
-                return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, 
min, max, chunkSize);
-            }
-        } else {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, 
max, chunkSize);
-        }
-    }
-
-    /**
-     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
-     * and tumble chunks in step size.
-     */
-    private List<ChunkRange> splitEvenlySizedChunks(
-            TableId tableId,
-            Object min,
-            Object max,
-            long approximateRowCnt,
-            int chunkSize,
-            int dynamicChunkSize) {
-        LOG.info(
-                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
-                tableId,
-                approximateRowCnt,
-                chunkSize,
-                dynamicChunkSize);
-        if (approximateRowCnt <= chunkSize) {
-            // there is no more than one chunk, return full table as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
-        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            chunkStart = chunkEnd;
-            try {
-                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
-            } catch (ArithmeticException e) {
-                // Stop chunk split to avoid dead loop when number overflows.
-                break;
-            }
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    // 
------------------------------------------------------------------------------------------
-
-    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
-    private List<ChunkRange> splitUnevenlySizedChunks(
-            JdbcConnection jdbc,
-            TableId tableId,
-            Column splitColumn,
-            Object min,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        LOG.info(
-                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
-        int count = 0;
-        while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
-            // we start from [null, min + chunk_size) and avoid [null, min)
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            // may sleep a while to avoid DDOS on PostgreSQL server
-            maySleep(count++, tableId);
-            chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, 
chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    private Object nextChunkEnd(
-            JdbcConnection jdbc,
-            Object previousChunkEnd,
-            TableId tableId,
-            Column splitColumn,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        // chunk end might be null when max values are removed
-        Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, 
previousChunkEnd);
-        if (Objects.equals(previousChunkEnd, chunkEnd)) {
-            // we don't allow equal chunk start and end,
-            // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
-        }
-        if (ObjectUtils.compare(chunkEnd, max) >= 0) {
-            return null;
-        } else {
-            return chunkEnd;
-        }
-    }
-
-    private SnapshotSplit createSnapshotSplit(
-            JdbcConnection jdbc,
-            TableId tableId,
-            int chunkId,
-            RowType splitKeyType,
-            Object chunkStart,
-            Object chunkEnd) {
-        // currently, we only support single split column
-        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
-        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
-        Map<TableId, TableChange> schema = new HashMap<>();
-        schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
-        return new SnapshotSplit(
-                tableId,
-                splitId(tableId, chunkId),
-                splitKeyType,
-                splitStart,
-                splitEnd,
-                null,
-                schema);
-    }
-
-    /** Returns the distribution factor of the given table. */
-    private double calculateDistributionFactor(
-            TableId tableId, Object min, Object max, long approximateRowCnt) {
-
-        if (!min.getClass().equals(max.getClass())) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
-                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
-        }
-        if (approximateRowCnt == 0) {
-            return Double.MAX_VALUE;
-        }
-        BigDecimal difference = ObjectUtils.minus(max, min);
-        // factor = (max - min + 1) / rowCount
-        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
-        double distributionFactor =
-                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
-        LOG.info(
-                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
-                tableId,
-                distributionFactor,
-                min,
-                max,
-                approximateRowCnt);
-        return distributionFactor;
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
index 3e8a1feb6..e7408be34 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
@@ -146,12 +145,6 @@ public class PostgresQueryUtils {
                 });
     }
 
-    public static String buildSplitScanQuery(
-            TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean 
isLastSplit) {
-        return buildSplitScanQuery(
-                tableId, pkRowType, isFirstSplit, isLastSplit, new 
ArrayList<>());
-    }
-
     public static String buildSplitScanQuery(
             TableId tableId,
             RowType pkRowType,
@@ -276,7 +269,7 @@ public class PostgresQueryUtils {
         return String.format("(%s)::text", value);
     }
 
-    private static boolean isUUID(Column column) {
+    public static boolean isUUID(Column column) {
         return column.typeName().equals("uuid");
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
index 395467175..fb338a0ea 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
@@ -17,345 +17,51 @@
 
 package org.apache.flink.cdc.connectors.sqlserver.source.dialect;
 
+import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
 import 
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
-import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import 
org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils;
 import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerUtils;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.math.BigDecimal;
-import java.math.RoundingMode;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 
 /**
  * The {@code ChunkSplitter} used to split SqlServer table into a set of 
chunks for JDBC data
  * source.
  */
-public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SqlServerChunkSplitter.class);
-
-    private final JdbcSourceConfig sourceConfig;
-    private final JdbcDataSourceDialect dialect;
+@Internal
+public class SqlServerChunkSplitter extends JdbcSourceChunkSplitter {
 
     public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
-        this.sourceConfig = sourceConfig;
-        this.dialect = dialect;
-    }
-
-    private static String splitId(TableId tableId, int chunkId) {
-        return tableId.toString() + ":" + chunkId;
-    }
-
-    private static void maySleep(int count, TableId tableId) {
-        // every 100 queries to sleep 1s
-        if (count % 10 == 0) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // nothing to do
-            }
-            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
-        }
-    }
-
-    @Override
-    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
-        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
-            LOG.info("Start splitting table {} into chunks...", tableId);
-            long start = System.currentTimeMillis();
-
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
-            Column splitColumn =
-                    SqlServerUtils.getSplitColumn(table, 
sourceConfig.getChunkKeyColumn());
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
-            }
-
-            // convert chunks into splits
-            List<SnapshotSplit> splits = new ArrayList<>();
-            RowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
-            }
-
-            long end = System.currentTimeMillis();
-            LOG.info(
-                    "Split table {} into {} chunks, time cost: {}ms.",
-                    tableId,
-                    splits.size(),
-                    end - start);
-            return splits;
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(
-                    String.format("Generate Splits for table %s error", 
tableId), e);
-        }
-    }
-
-    @Override
-    public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column 
column)
-            throws SQLException {
-        return SqlServerUtils.queryMinMax(jdbc, tableId, column.name());
-    }
-
-    @Override
-    public Object queryMin(
-            JdbcConnection jdbc, TableId tableId, Column column, Object 
excludedLowerBound)
-            throws SQLException {
-        return SqlServerUtils.queryMin(jdbc, tableId, column.name(), 
excludedLowerBound);
+        super(sourceConfig, dialect);
     }
 
     @Override
-    public DataType fromDbzColumn(Column splitColumn) {
+    protected DataType fromDbzColumn(Column splitColumn) {
         return SqlServerTypeUtils.fromDbzColumn(splitColumn);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
     @Override
-    public Object queryNextChunkMax(
+    protected Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
-            Column column,
+            Column splitColumn,
             int chunkSize,
             Object includedLowerBound)
             throws SQLException {
         return SqlServerUtils.queryNextChunkMax(
-                jdbc, tableId, column.name(), chunkSize, includedLowerBound);
+                jdbc, tableId, splitColumn.name(), chunkSize, 
includedLowerBound);
     }
 
     @Override
-    public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) 
throws SQLException {
-        return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
-    }
-
-    @Override
-    public String buildSplitScanQuery(
-            TableId tableId, RowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
-        return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
-    }
-
-    /**
-     * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
-     * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
-     * many queries and is not efficient.
-     */
-    private List<ChunkRange> splitTableIntoChunks(
-            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
-        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
-        final Object min = minMax[0];
-        final Object max = minMax[1];
-        if (min == null || max == null || min.equals(max)) {
-            // empty table, or only one row, return full table scan as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final int chunkSize = sourceConfig.getSplitSize();
-        final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
-        final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
-
-        if (isEvenlySplitColumn(splitColumn)) {
-            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
-            double distributionFactor =
-                    calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
-                            && doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tableId, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
-            } else {
-                return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, 
min, max, chunkSize);
-            }
-        } else {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, 
max, chunkSize);
-        }
-    }
-
-    /**
-     * Split table into evenly sized chunks based on the numeric min and max 
value of split column,
-     * and tumble chunks in step size.
-     */
-    private List<ChunkRange> splitEvenlySizedChunks(
-            TableId tableId,
-            Object min,
-            Object max,
-            long approximateRowCnt,
-            int chunkSize,
-            int dynamicChunkSize) {
-        LOG.info(
-                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
-                tableId,
-                approximateRowCnt,
-                chunkSize,
-                dynamicChunkSize);
-        if (approximateRowCnt <= chunkSize) {
-            // there is no more than one chunk, return full table as a chunk
-            return Collections.singletonList(ChunkRange.all());
-        }
-
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
-        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            chunkStart = chunkEnd;
-            try {
-                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
-            } catch (ArithmeticException e) {
-                // Stop chunk split to avoid dead loop when number overflows.
-                break;
-            }
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    // 
------------------------------------------------------------------------------------------
-
-    /** Split table into unevenly sized chunks by continuously calculating 
next chunk max value. */
-    private List<ChunkRange> splitUnevenlySizedChunks(
-            JdbcConnection jdbc,
-            TableId tableId,
-            Column splitColumn,
-            Object min,
-            Object max,
-            int chunkSize)
+    protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
             throws SQLException {
-        LOG.info(
-                "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
-        final List<ChunkRange> splits = new ArrayList<>();
-        Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
-        int count = 0;
-        while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
-            // we start from [null, min + chunk_size) and avoid [null, min)
-            splits.add(ChunkRange.of(chunkStart, chunkEnd));
-            // may sleep awhile to avoid DDOS on SqlServer server
-            maySleep(count++, tableId);
-            chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, 
chunkSize);
-        }
-        // add the ending split
-        splits.add(ChunkRange.of(chunkStart, null));
-        return splits;
-    }
-
-    private Object nextChunkEnd(
-            JdbcConnection jdbc,
-            Object previousChunkEnd,
-            TableId tableId,
-            Column splitColumn,
-            Object max,
-            int chunkSize)
-            throws SQLException {
-        // chunk end might be null when max values are removed
-        Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, 
previousChunkEnd);
-        if (Objects.equals(previousChunkEnd, chunkEnd)) {
-            // we don't allow equal chunk start and end,
-            // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
-        }
-        if (ObjectUtils.compare(chunkEnd, max) >= 0) {
-            return null;
-        } else {
-            return chunkEnd;
-        }
-    }
-
-    private SnapshotSplit createSnapshotSplit(
-            JdbcConnection jdbc,
-            TableId tableId,
-            int chunkId,
-            RowType splitKeyType,
-            Object chunkStart,
-            Object chunkEnd) {
-        // currently, we only support single split column
-        Object[] splitStart = chunkStart == null ? null : new Object[] 
{chunkStart};
-        Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
-        Map<TableId, TableChange> schema = new HashMap<>();
-        schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
-        return new SnapshotSplit(
-                tableId,
-                splitId(tableId, chunkId),
-                splitKeyType,
-                splitStart,
-                splitEnd,
-                null,
-                schema);
-    }
-
-    /** Returns the distribution factor of the given table. */
-    private double calculateDistributionFactor(
-            TableId tableId, Object min, Object max, long approximateRowCnt) {
-
-        if (!min.getClass().equals(max.getClass())) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
-                            min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
-        }
-        if (approximateRowCnt == 0) {
-            return Double.MAX_VALUE;
-        }
-        BigDecimal difference = ObjectUtils.minus(max, min);
-        // factor = (max - min + 1) / rowCount
-        final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
-        double distributionFactor =
-                subRowCnt
-                        .divide(new BigDecimal(approximateRowCnt), 4, 
RoundingMode.CEILING)
-                        .doubleValue();
-        LOG.info(
-                "The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
-                tableId,
-                distributionFactor,
-                min,
-                max,
-                approximateRowCnt);
-        return distributionFactor;
+        return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
index 711f42f8a..b2292a825 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
@@ -50,7 +50,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 
@@ -59,26 +58,6 @@ public class SqlServerUtils {
 
     public SqlServerUtils() {}
 
-    public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
String columnName)
-            throws SQLException {
-        final String minMaxQuery =
-                String.format(
-                        "SELECT MIN(%s), MAX(%s) FROM %s",
-                        quote(columnName), quote(columnName), quote(tableId));
-        return jdbc.queryAndMap(
-                minMaxQuery,
-                rs -> {
-                    if (!rs.next()) {
-                        // this should never happen
-                        throw new SQLException(
-                                String.format(
-                                        "No result returned after running 
query [%s]",
-                                        minMaxQuery));
-                    }
-                    return rowToArray(rs, 2);
-                });
-    }
-
     public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId 
tableId)
             throws SQLException {
         // The statement used to get approximate row count which is less
@@ -103,27 +82,6 @@ public class SqlServerUtils {
                 });
     }
 
-    public static Object queryMin(
-            JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
-            throws SQLException {
-        final String minQuery =
-                String.format(
-                        "SELECT MIN(%s) FROM %s WHERE %s > ?",
-                        quote(columnName), quote(tableId), quote(columnName));
-        return jdbc.prepareQueryAndMap(
-                minQuery,
-                ps -> ps.setObject(1, excludedLowerBound),
-                rs -> {
-                    if (!rs.next()) {
-                        // this should never happen
-                        throw new SQLException(
-                                String.format(
-                                        "No result returned after running 
query [%s]", minQuery));
-                    }
-                    return rs.getObject(1);
-                });
-    }
-
     /**
      * Returns the next LSN to be read from the database. This is the LSN of 
the last record that
      * was read from the database.

Reply via email to