This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 47f566005 [FLINK-35344][cdc-base] Move same code from multiple
subclasses to JdbcSourceChunkSplitter (#3319)
47f566005 is described below
commit 47f5660055016460719bc8a9a72cee868e9aa6fe
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Aug 2 17:45:38 2024 +0800
[FLINK-35344][cdc-base] Move same code from multiple subclasses to
JdbcSourceChunkSplitter (#3319)
---
.../assigner/splitter/JdbcSourceChunkSplitter.java | 387 ++++++++++++++++++---
.../base/source/utils/JdbcChunkUtils.java | 139 ++++++++
.../base/experimental/MySqlChunkSplitter.java | 323 +----------------
.../base/experimental/utils/MySqlUtils.java | 60 +---
.../db2/source/dialect/Db2ChunkSplitter.java | 309 +---------------
.../cdc/connectors/db2/source/utils/Db2Utils.java | 42 ---
.../assigner/splitter/OracleChunkSplitter.java | 310 ++---------------
.../oracle/source/utils/OracleUtils.java | 43 ---
.../postgres/source/PostgresChunkSplitter.java | 314 ++---------------
.../postgres/source/utils/PostgresQueryUtils.java | 9 +-
.../source/dialect/SqlServerChunkSplitter.java | 314 +----------------
.../sqlserver/source/utils/SqlServerUtils.java | 42 ---
12 files changed, 564 insertions(+), 1728 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 4a24aed00..1d50164cc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -18,70 +18,119 @@
package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import static java.math.BigDecimal.ROUND_CEILING;
+import static
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
/** The {@code ChunkSplitter} used to split table into a set of chunks for
JDBC data source. */
@Experimental
-public interface JdbcSourceChunkSplitter extends ChunkSplitter {
+public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcSourceChunkSplitter.class);
+ protected final JdbcSourceConfig sourceConfig;
+ protected final JdbcDataSourceDialect dialect;
+
+ public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
+ this.sourceConfig = sourceConfig;
+ this.dialect = dialect;
+ }
/** Generates all snapshot splits (chunks) for the give table path. */
@Override
- Collection<SnapshotSplit> generateSplits(TableId tableId);
+ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
+ try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
- /**
- * Query the maximum and minimum value of the column in the table. e.g.
query string <code>
- * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
- *
- * @param jdbc JDBC connection.
- * @param tableId table identity.
- * @param column column.
- * @return maximum and minimum value.
- */
- Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException;
+ LOG.info("Start splitting table {} into chunks...", tableId);
+ long start = System.currentTimeMillis();
- /**
- * Query the minimum value of the column in the table, and the minimum
value must greater than
- * the excludedLowerBound value. e.g. prepare query string <code>
- * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
- *
- * @param jdbc JDBC connection.
- * @param tableId table identity.
- * @param column column.
- * @param excludedLowerBound the minimum value should be greater than this
value.
- * @return minimum value.
- */
- Object queryMin(JdbcConnection jdbc, TableId tableId, Column column,
Object excludedLowerBound)
- throws SQLException;
+ Table table =
+ Objects.requireNonNull(dialect.queryTableSchema(jdbc,
tableId)).getTable();
+ Column splitColumn = getSplitColumn(table,
sourceConfig.getChunkKeyColumn());
+ final List<ChunkRange> chunks;
+ try {
+ chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException("Failed to split chunks for
table " + tableId, e);
+ }
+
+ // convert chunks into splits
+ List<SnapshotSplit> splits = new ArrayList<>();
+ RowType splitType = getSplitType(splitColumn);
+ for (int i = 0; i < chunks.size(); i++) {
+ ChunkRange chunk = chunks.get(i);
+ SnapshotSplit split =
+ createSnapshotSplit(
+ jdbc,
+ tableId,
+ i,
+ splitType,
+ chunk.getChunkStart(),
+ chunk.getChunkEnd());
+ splits.add(split);
+ }
+
+ long end = System.currentTimeMillis();
+ LOG.info(
+ "Split table {} into {} chunks, time cost: {}ms.",
+ tableId,
+ splits.size(),
+ end - start);
+ return splits;
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ String.format("Generate Splits for table %s error",
tableId), e);
+ }
+ }
/**
* Query the maximum value of the next chunk, and the next chunk must be
greater than or equal
* to <code>includedLowerBound</code> value [min_1, max_1), [min_2,
max_2),... [min_n, null).
* Each time this method is called it will return max1, max2...
*
+ * <p>Each database has different grammar to get limit number of data, for
example, `limit N` in
+ * mysql or postgres, `top(N)` in sqlserver , `FETCH FIRST %S ROWS ONLY`
in DB2.
+ *
* @param jdbc JDBC connection.
* @param tableId table identity.
- * @param column column.
+ * @param splitColumn column.
* @param chunkSize chunk size.
* @param includedLowerBound the previous chunk end value.
* @return next chunk end value.
*/
- Object queryNextChunkMax(
+ protected abstract Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
- Column column,
+ Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException;
@@ -89,23 +138,16 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
/**
* Approximate total number of entries in the lookup table.
*
+ * <p>Each database has different system table to lookup up approximate
total number. For
+ * example, `pg_class` in postgres, `sys.dm_db_partition_stats` in
sqlserver, `SYSCAT.TABLE` in
+ * db2.
+ *
* @param jdbc JDBC connection.
* @param tableId table identity.
* @return approximate row count.
*/
- Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws
SQLException;
-
- /**
- * Build the scan query sql of the {@link SnapshotSplit}.
- *
- * @param tableId table identity.
- * @param splitKeyType primary key type.
- * @param isFirstSplit whether the first split.
- * @param isLastSplit whether the last split.
- * @return query sql.
- */
- String buildSplitScanQuery(
- TableId tableId, RowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit);
+ protected abstract Long queryApproximateRowCnt(JdbcConnection jdbc,
TableId tableId)
+ throws SQLException;
/**
* Checks whether split column is evenly distributed across its range.
@@ -113,7 +155,7 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
* @param splitColumn split column.
* @return true that means split column with type BIGINT, INT, DECIMAL.
*/
- default boolean isEvenlySplitColumn(Column splitColumn) {
+ protected boolean isEvenlySplitColumn(Column splitColumn) {
DataType flinkType = fromDbzColumn(splitColumn);
LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
@@ -130,7 +172,94 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
* @param splitColumn dbz split column.
* @return flink data type
*/
- DataType fromDbzColumn(Column splitColumn);
+ protected abstract DataType fromDbzColumn(Column splitColumn);
+
+ /** Returns the distribution factor of the given table. */
+ protected double calculateDistributionFactor(
+ TableId tableId, Object min, Object max, long approximateRowCnt) {
+
+ if (!min.getClass().equals(max.getClass())) {
+ throw new IllegalStateException(
+ String.format(
+ "Unsupported operation type, the MIN value type %s
is different with MAX value type %s.",
+ min.getClass().getSimpleName(),
max.getClass().getSimpleName()));
+ }
+ if (approximateRowCnt == 0) {
+ return Double.MAX_VALUE;
+ }
+ BigDecimal difference = ObjectUtils.minus(max, min);
+ // factor = (max - min + 1) / rowCount
+ final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
+ double distributionFactor =
+ subRowCnt.divide(new BigDecimal(approximateRowCnt), 4,
ROUND_CEILING).doubleValue();
+ LOG.info(
+ "The distribution factor of table {} is {} according to the
min split key {}, max split key {} and approximate row count {}",
+ tableId,
+ distributionFactor,
+ min,
+ max,
+ approximateRowCnt);
+ return distributionFactor;
+ }
+
+ /**
+ * Get the column which is seen as chunk key.
+ *
+ * @param table table identity.
+ * @param chunkKeyColumn column name which is seen as chunk key, if
chunkKeyColumn is null, use
+ * primary key instead. @Column the column which is seen as chunk key.
+ */
+ protected Column getSplitColumn(Table table, @Nullable String
chunkKeyColumn) {
+ return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
+ }
+
+ /** ChunkEnd less than or equal to max. */
+ protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+ return ObjectUtils.compare(chunkEnd, max) <= 0;
+ }
+
+ /** ChunkEnd greater than or equal to max. */
+ protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+ return ObjectUtils.compare(chunkEnd, max) >= 0;
+ }
+
+ /**
+ * Query the maximum and minimum value of the column in the table. e.g.
query string <code>
+ * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+ *
+ * @param jdbc JDBC connection.
+ * @param tableId table identity.
+ * @param splitColumn column.
+ * @return maximum and minimum value.
+ */
+ protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
Column splitColumn)
+ throws SQLException {
+ return JdbcChunkUtils.queryMinMax(
+ jdbc,
+ jdbc.quotedTableIdString(tableId),
+ jdbc.quotedColumnIdString(splitColumn.name()));
+ }
+
+ /**
+ * Query the minimum value of the column in the table, and the minimum
value must greater than
+ * the excludedLowerBound value. e.g. prepare query string <code>
+ * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+ *
+ * @param jdbc JDBC connection.
+ * @param tableId table identity.
+ * @param splitColumn column.
+ * @param excludedLowerBound the minimum value should be greater than this
value.
+ * @return minimum value.
+ */
+ protected Object queryMin(
+ JdbcConnection jdbc, TableId tableId, Column splitColumn, Object
excludedLowerBound)
+ throws SQLException {
+ return JdbcChunkUtils.queryMin(
+ jdbc,
+ jdbc.quotedColumnIdString(splitColumn.name()),
+ jdbc.quotedTableIdString(tableId),
+ excludedLowerBound);
+ }
/**
* convert dbz column to Flink row type.
@@ -138,8 +267,178 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
* @param splitColumn split column.
* @return flink row type.
*/
- default RowType getSplitType(Column splitColumn) {
+ private RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(),
fromDbzColumn(splitColumn))).getLogicalType();
}
+
+ /**
+ * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
+ * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
+ * many queries and is not efficient.
+ */
+ private List<ChunkRange> splitTableIntoChunks(
+ JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
+ final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
+ final Object min = minMax[0];
+ final Object max = minMax[1];
+ if (min == null || max == null || min.equals(max)) {
+ // empty table, or only one row, return full table scan as a chunk
+ return Collections.singletonList(ChunkRange.all());
+ }
+
+ final int chunkSize = sourceConfig.getSplitSize();
+ final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
+ final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
+
+ if (isEvenlySplitColumn(splitColumn)) {
+ long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
+ double distributionFactor =
+ calculateDistributionFactor(tableId, min, max,
approximateRowCnt);
+
+ boolean dataIsEvenlyDistributed =
+ doubleCompare(distributionFactor, distributionFactorLower)
>= 0
+ && doubleCompare(distributionFactor,
distributionFactorUpper) <= 0;
+
+ if (dataIsEvenlyDistributed) {
+ // the minimum dynamic chunk size is at least 1
+ final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
+ return splitEvenlySizedChunks(
+ tableId, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
+ } else {
+ return splitUnevenlySizedChunks(jdbc, tableId, splitColumn,
min, max, chunkSize);
+ }
+ } else {
+ return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min,
max, chunkSize);
+ }
+ }
+
+ /**
+ * Split table into evenly sized chunks based on the numeric min and max
value of split column,
+ * and tumble chunks in step size.
+ */
+ private List<ChunkRange> splitEvenlySizedChunks(
+ TableId tableId,
+ Object min,
+ Object max,
+ long approximateRowCnt,
+ int chunkSize,
+ int dynamicChunkSize) {
+ LOG.info(
+ "Use evenly-sized chunk optimization for table {}, the
approximate row count is {}, the chunk size is {}, the dynamic chunk size is
{}",
+ tableId,
+ approximateRowCnt,
+ chunkSize,
+ dynamicChunkSize);
+ if (approximateRowCnt <= chunkSize) {
+ // there is no more than one chunk, return full table as a chunk
+ return Collections.singletonList(ChunkRange.all());
+ }
+
+ final List<ChunkRange> splits = new ArrayList<>();
+ Object chunkStart = null;
+ Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
+ while (ObjectUtils.compare(chunkEnd, max) <= 0) {
+ splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ chunkStart = chunkEnd;
+ try {
+ chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
+ } catch (ArithmeticException e) {
+ // Stop chunk split to avoid dead loop when number overflows.
+ break;
+ }
+ }
+ // add the ending split
+ splits.add(ChunkRange.of(chunkStart, null));
+ return splits;
+ }
+
+ /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
+ private List<ChunkRange> splitUnevenlySizedChunks(
+ JdbcConnection jdbc,
+ TableId tableId,
+ Column splitColumn,
+ Object min,
+ Object max,
+ int chunkSize)
+ throws SQLException {
+ LOG.info(
+ "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
+ final List<ChunkRange> splits = new ArrayList<>();
+ Object chunkStart = null;
+ Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
+ int count = 0;
+ while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
+ // we start from [null, min + chunk_size) and avoid [null, min)
+ splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ // may sleep a while to avoid DDOS on PostgreSQL server
+ maySleep(count++, tableId);
+ chunkStart = chunkEnd;
+ chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max,
chunkSize);
+ }
+ // add the ending split
+ splits.add(ChunkRange.of(chunkStart, null));
+ return splits;
+ }
+
+ private Object nextChunkEnd(
+ JdbcConnection jdbc,
+ Object previousChunkEnd,
+ TableId tableId,
+ Column splitColumn,
+ Object max,
+ int chunkSize)
+ throws SQLException {
+ // chunk end might be null when max values are removed
+ Object chunkEnd =
+ queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize,
previousChunkEnd);
+ if (Objects.equals(previousChunkEnd, chunkEnd)) {
+ // we don't allow equal chunk start and end,
+ // should query the next one larger than chunkEnd
+ chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
+ }
+ if (isChunkEndGeMax(chunkEnd, max)) {
+ return null;
+ } else {
+ return chunkEnd;
+ }
+ }
+
+ private SnapshotSplit createSnapshotSplit(
+ JdbcConnection jdbc,
+ TableId tableId,
+ int chunkId,
+ RowType splitKeyType,
+ Object chunkStart,
+ Object chunkEnd) {
+ // currently, we only support single split column
+ Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
+ Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
+ Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
+ schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
+ return new SnapshotSplit(
+ tableId,
+ splitId(tableId, chunkId),
+ splitKeyType,
+ splitStart,
+ splitEnd,
+ null,
+ schema);
+ }
+
+ private String splitId(TableId tableId, int chunkId) {
+ return tableId.toString() + ":" + chunkId;
+ }
+
+ private void maySleep(int count, TableId tableId) {
+ // every 10 queries to sleep 0.1s
+ if (count % 10 == 0) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // nothing to do
+ }
+ LOG.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
new file mode 100644
index 000000000..46b30310c
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.utils;
+
+import org.apache.flink.table.api.ValidationException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
+
+/** Utilities to split chunks of table. */
+public class JdbcChunkUtils {
+
+ /**
+ * Query the maximum and minimum value of the column in the table. e.g.
query string <code>
+ * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+ *
+ * @param jdbc JDBC connection.
+ * @param quotedTableName table identity.
+ * @param quotedColumnName column name.
+ * @return maximum and minimum value.
+ */
+ public static Object[] queryMinMax(
+ JdbcConnection jdbc, String quotedTableName, String
quotedColumnName)
+ throws SQLException {
+ final String minMaxQuery =
+ String.format(
+ "SELECT MIN(%s), MAX(%s) FROM %s",
+ quotedColumnName, quotedColumnName, quotedTableName);
+ return jdbc.queryAndMap(
+ minMaxQuery,
+ rs -> {
+ if (!rs.next()) {
+ // this should never happen
+ throw new SQLException(
+ String.format(
+ "No result returned after running
query [%s]",
+ minMaxQuery));
+ }
+ return rowToArray(rs, 2);
+ });
+ }
+
+ /**
+ * Query the minimum value of the column in the table, and the minimum
value must greater than
+ * the excludedLowerBound value. e.g. prepare query string <code>
+ * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+ *
+ * @param jdbc JDBC connection.
+ * @param quotedTableName table identity.
+ * @param quotedColumnName column name.
+ * @param excludedLowerBound the minimum value should be greater than this
value.
+ * @return minimum value.
+ */
+ public static Object queryMin(
+ JdbcConnection jdbc,
+ String quotedTableName,
+ String quotedColumnName,
+ Object excludedLowerBound)
+ throws SQLException {
+ final String minQuery =
+ String.format(
+ "SELECT MIN(%s) FROM %s WHERE %s > ?",
+ quotedColumnName, quotedTableName, quotedColumnName);
+ return jdbc.prepareQueryAndMap(
+ minQuery,
+ ps -> ps.setObject(1, excludedLowerBound),
+ rs -> {
+ if (!rs.next()) {
+ // this should never happen
+ throw new SQLException(
+ String.format(
+ "No result returned after running
query [%s]", minQuery));
+ }
+ return rs.getObject(1);
+ });
+ }
+
+ /**
+ * Get the column which is seen as chunk key.
+ *
+ * @param table table identity.
+ * @param chunkKeyColumn column name which is seen as chunk key, if
chunkKeyColumn is null, use
+ * primary key instead. @Column the column which is seen as chunk key.
+ */
+ public static Column getSplitColumn(Table table, @Nullable String
chunkKeyColumn) {
+ List<Column> primaryKeys = table.primaryKeyColumns();
+ if (primaryKeys.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Incremental snapshot for tables requires primary
key,"
+ + " but table %s doesn't have primary
key.",
+ table.id()));
+ }
+
+ if (chunkKeyColumn != null) {
+ Optional<Column> targetPkColumn =
+ primaryKeys.stream()
+ .filter(col -> chunkKeyColumn.equals(col.name()))
+ .findFirst();
+ if (targetPkColumn.isPresent()) {
+ return targetPkColumn.get();
+ }
+ throw new ValidationException(
+ String.format(
+ "Chunk key column '%s' doesn't exist in the
primary key [%s] of the table %s.",
+ chunkKeyColumn,
+
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+ table.id()));
+ }
+
+ // use first field in primary key as the split key
+ return primaryKeys.get(0);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java
index 29db714eb..497e8e657 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java
@@ -17,353 +17,48 @@
package org.apache.flink.cdc.connectors.base.experimental;
+import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.experimental.utils.MySqlTypeUtils;
import org.apache.flink.cdc.connectors.base.experimental.utils.MySqlUtils;
-import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
-import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static java.math.BigDecimal.ROUND_CEILING;
-import static
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
/** The {@code ChunkSplitter} used to split table into a set of chunks for
JDBC data source. */
-public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
-
- private static final Logger LOG =
LoggerFactory.getLogger(MySqlChunkSplitter.class);
-
- private final JdbcSourceConfig sourceConfig;
- private final JdbcDataSourceDialect dialect;
+@Internal
+public class MySqlChunkSplitter extends JdbcSourceChunkSplitter {
public MySqlChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
- this.sourceConfig = sourceConfig;
- this.dialect = dialect;
- }
-
- @Override
- public Collection<SnapshotSplit> generateSplits(TableId tableId) {
- try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
- LOG.info("Start splitting table {} into chunks...", tableId);
- long start = System.currentTimeMillis();
-
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- Column splitColumn = getSplitColumn(table);
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new FlinkRuntimeException("Failed to split chunks for
table " + tableId, e);
- }
-
- // convert chunks into splits
- List<SnapshotSplit> splits = new ArrayList<>();
- RowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
- }
-
- long end = System.currentTimeMillis();
- LOG.info(
- "Split table {} into {} chunks, time cost: {}ms.",
- tableId,
- splits.size(),
- end - start);
- return splits;
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format("Generate Splits for table %s error",
tableId), e);
- }
- }
-
- @Override
- public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column
column)
- throws SQLException {
- return MySqlUtils.queryMinMax(jdbc, tableId, column.name());
- }
-
- @Override
- public Object queryMin(
- JdbcConnection jdbc, TableId tableId, Column column, Object
excludedLowerBound)
- throws SQLException {
- return MySqlUtils.queryMin(jdbc, tableId, column.name(),
excludedLowerBound);
+ super(sourceConfig, dialect);
}
@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
- Column column,
+ Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return MySqlUtils.queryNextChunkMax(
- jdbc, tableId, column.name(), chunkSize, includedLowerBound);
+ jdbc, tableId, splitColumn.name(), chunkSize,
includedLowerBound);
}
@Override
- public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
+ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
+ throws SQLException {
return MySqlUtils.queryApproximateRowCnt(jdbc, tableId);
}
@Override
- public String buildSplitScanQuery(
- TableId tableId, RowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
- return MySqlUtils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
- }
-
- @Override
- public DataType fromDbzColumn(Column splitColumn) {
+ protected DataType fromDbzColumn(Column splitColumn) {
return MySqlTypeUtils.fromDbzColumn(splitColumn);
}
-
- //
--------------------------------------------------------------------------------------------
- // Utilities
- //
--------------------------------------------------------------------------------------------
-
- /**
- * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
- * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
- * many queries and is not efficient.
- */
- private List<ChunkRange> splitTableIntoChunks(
- JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
- final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
- final Object min = minMax[0];
- final Object max = minMax[1];
- if (min == null || max == null || min.equals(max)) {
- // empty table, or only one row, return full table scan as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final int chunkSize = sourceConfig.getSplitSize();
- final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
- final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
-
- if (isEvenlySplitColumn(splitColumn)) {
- long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
- double distributionFactor =
- calculateDistributionFactor(tableId, min, max,
approximateRowCnt);
-
- boolean dataIsEvenlyDistributed =
- doubleCompare(distributionFactor, distributionFactorLower)
>= 0
- && doubleCompare(distributionFactor,
distributionFactorUpper) <= 0;
-
- if (dataIsEvenlyDistributed) {
- // the minimum dynamic chunk size is at least 1
- final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
- return splitEvenlySizedChunks(
- tableId, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn,
min, max, chunkSize);
- }
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min,
max, chunkSize);
- }
- }
-
- /**
- * Split table into evenly sized chunks based on the numeric min and max
value of split column,
- * and tumble chunks in step size.
- */
- private List<ChunkRange> splitEvenlySizedChunks(
- TableId tableId,
- Object min,
- Object max,
- long approximateRowCnt,
- int chunkSize,
- int dynamicChunkSize) {
- LOG.info(
- "Use evenly-sized chunk optimization for table {}, the
approximate row count is {}, the chunk size is {}, the dynamic chunk size is
{}",
- tableId,
- approximateRowCnt,
- chunkSize,
- dynamicChunkSize);
- if (approximateRowCnt <= chunkSize) {
- // there is no more than one chunk, return full table as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
- while (ObjectUtils.compare(chunkEnd, max) <= 0) {
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- chunkStart = chunkEnd;
- try {
- chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
- } catch (ArithmeticException e) {
- // Stop chunk split to avoid dead loop when number overflows.
- break;
- }
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
- private List<ChunkRange> splitUnevenlySizedChunks(
- JdbcConnection jdbc,
- TableId tableId,
- Column splitColumn,
- Object min,
- Object max,
- int chunkSize)
- throws SQLException {
- LOG.info(
- "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
- int count = 0;
- while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
- // we start from [null, min + chunk_size) and avoid [null, min)
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- // may sleep a while to avoid DDOS on MySQL server
- maySleep(count++, tableId);
- chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max,
chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- private Object nextChunkEnd(
- JdbcConnection jdbc,
- Object previousChunkEnd,
- TableId tableId,
- Column splitColumn,
- Object max,
- int chunkSize)
- throws SQLException {
- // chunk end might be null when max values are removed
- Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize,
previousChunkEnd);
- if (Objects.equals(previousChunkEnd, chunkEnd)) {
- // we don't allow equal chunk start and end,
- // should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
- }
- if (ObjectUtils.compare(chunkEnd, max) >= 0) {
- return null;
- } else {
- return chunkEnd;
- }
- }
-
- private SnapshotSplit createSnapshotSplit(
- JdbcConnection jdbc,
- TableId tableId,
- int chunkId,
- RowType splitKeyType,
- Object chunkStart,
- Object chunkEnd) {
- // currently, we only support single split column
- Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
- Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
- Map<TableId, TableChange> schema = new HashMap<>();
- schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
- return new SnapshotSplit(
- tableId,
- splitId(tableId, chunkId),
- splitKeyType,
- splitStart,
- splitEnd,
- null,
- schema);
- }
-
- //
------------------------------------------------------------------------------------------
- /** Returns the distribution factor of the given table. */
- private double calculateDistributionFactor(
- TableId tableId, Object min, Object max, long approximateRowCnt) {
-
- if (!min.getClass().equals(max.getClass())) {
- throw new IllegalStateException(
- String.format(
- "Unsupported operation type, the MIN value type %s
is different with MAX value type %s.",
- min.getClass().getSimpleName(),
max.getClass().getSimpleName()));
- }
- if (approximateRowCnt == 0) {
- return Double.MAX_VALUE;
- }
- BigDecimal difference = ObjectUtils.minus(max, min);
- // factor = (max - min + 1) / rowCount
- final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
- double distributionFactor =
- subRowCnt.divide(new BigDecimal(approximateRowCnt), 4,
ROUND_CEILING).doubleValue();
- LOG.info(
- "The distribution factor of table {} is {} according to the
min split key {}, max split key {} and approximate row count {}",
- tableId,
- distributionFactor,
- min,
- max,
- approximateRowCnt);
- return distributionFactor;
- }
-
- private static String splitId(TableId tableId, int chunkId) {
- return tableId.toString() + ":" + chunkId;
- }
-
- private static void maySleep(int count, TableId tableId) {
- // every 10 queries to sleep 100ms
- if (count % 10 == 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // nothing to do
- }
- LOG.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
- }
- }
-
- public static Column getSplitColumn(Table table) {
- List<Column> primaryKeys = table.primaryKeyColumns();
- if (primaryKeys.isEmpty()) {
- throw new ValidationException(
- String.format(
- "Incremental snapshot for tables requires primary
key,"
- + " but table %s doesn't have primary
key.",
- table.id()));
- }
-
- // use first field in primary key as the split key
- return primaryKeys.get(0);
- }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java
index 0d6ef1f88..f85a283a7 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java
@@ -45,7 +45,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
@@ -54,26 +53,6 @@ public class MySqlUtils {
private MySqlUtils() {}
- public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
String columnName)
- throws SQLException {
- final String minMaxQuery =
- String.format(
- "SELECT MIN(%s), MAX(%s) FROM %s",
- quote(columnName), quote(columnName), quote(tableId));
- return jdbc.queryAndMap(
- minMaxQuery,
- rs -> {
- if (!rs.next()) {
- // this should never happen
- throw new SQLException(
- String.format(
- "No result returned after running
query [%s]",
- minMaxQuery));
- }
- return rowToArray(rs, 2);
- });
- }
-
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId
tableId)
throws SQLException {
// The statement used to get approximate row count which is less
@@ -94,27 +73,6 @@ public class MySqlUtils {
});
}
- public static Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
- throws SQLException {
- final String minQuery =
- String.format(
- "SELECT MIN(%s) FROM %s WHERE %s > ?",
- quote(columnName), quote(tableId), quote(columnName));
- return jdbc.prepareQueryAndMap(
- minQuery,
- ps -> ps.setObject(1, excludedLowerBound),
- rs -> {
- if (!rs.next()) {
- // this should never happen
- throw new SQLException(
- String.format(
- "No result returned after running
query [%s]", minQuery));
- }
- return rs.getObject(1);
- });
- }
-
public static Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
@@ -122,7 +80,7 @@ public class MySqlUtils {
int chunkSize,
Object includedLowerBound)
throws SQLException {
- String quotedColumn = quote(splitColumnName);
+ String quotedColumn = jdbc.quotedColumnIdString(splitColumnName);
String query =
String.format(
"SELECT MAX(%s) FROM ("
@@ -130,7 +88,7 @@ public class MySqlUtils {
+ ") AS T",
quotedColumn,
quotedColumn,
- quote(tableId),
+ jdbc.quotedTableIdString(tableId),
quotedColumn,
quotedColumn,
chunkSize);
@@ -313,20 +271,6 @@ public class MySqlUtils {
.getLogicalType();
}
- public static Column getSplitColumn(Table table) {
- List<Column> primaryKeys = table.primaryKeyColumns();
- if (primaryKeys.isEmpty()) {
- throw new ValidationException(
- String.format(
- "Incremental snapshot for tables requires primary
key,"
- + " but table %s doesn't have primary
key.",
- table.id()));
- }
-
- // use first field in primary key as the split key
- return primaryKeys.get(0);
- }
-
public static String quote(String dbOrTableName) {
return "`" + dbOrTableName + "`";
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java
index 6381cd74a..6e77de798 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java
@@ -17,139 +17,37 @@
package org.apache.flink.cdc.connectors.db2.source.dialect;
+import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
-import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
-import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2TypeUtils;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
/**
* The splitter to split the table into chunks using primary-key (by default)
or a given split key.
*/
-public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
-
- private static final Logger LOG =
LoggerFactory.getLogger(Db2ChunkSplitter.class);
-
- private final JdbcSourceConfig sourceConfig;
- private final JdbcDataSourceDialect dialect;
+@Internal
+public class Db2ChunkSplitter extends JdbcSourceChunkSplitter {
public Db2ChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
- this.sourceConfig = sourceConfig;
- this.dialect = dialect;
- }
-
- private static String splitId(TableId tableId, int chunkId) {
- return tableId.toString() + ":" + chunkId;
- }
-
- private static void maySleep(int count, TableId tableId) {
- // every 10 queries to sleep 0.1s
- if (count % 10 == 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // nothing to do
- }
- LOG.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
- }
- }
-
- @Override
- public Collection<SnapshotSplit> generateSplits(TableId tableId) {
- try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
- LOG.info("Start splitting table {} into chunks...", tableId);
- long start = System.currentTimeMillis();
-
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- Column splitColumn = Db2Utils.getSplitColumn(table,
sourceConfig.getChunkKeyColumn());
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new FlinkRuntimeException("Failed to split chunks for
table " + tableId, e);
- }
-
- // convert chunks into splits
- List<SnapshotSplit> splits = new ArrayList<>();
- RowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
- }
-
- long end = System.currentTimeMillis();
- LOG.info(
- "Split table {} into {} chunks, time cost: {}ms.",
- tableId,
- splits.size(),
- end - start);
- return splits;
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format("Generate Splits for table %s error",
tableId), e);
- }
+ super(sourceConfig, dialect);
}
@Override
- public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column
column)
- throws SQLException {
- return Db2Utils.queryMinMax(jdbc, tableId, column.name());
- }
-
- @Override
- public Object queryMin(
- JdbcConnection jdbc, TableId tableId, Column column, Object
excludedLowerBound)
- throws SQLException {
- return Db2Utils.queryMin(jdbc, tableId, column.name(),
excludedLowerBound);
- }
-
- @Override
- public DataType fromDbzColumn(Column splitColumn) {
+ protected DataType fromDbzColumn(Column splitColumn) {
return Db2TypeUtils.fromDbzColumn(splitColumn);
}
- //
--------------------------------------------------------------------------------------------
- // Utilities
- //
--------------------------------------------------------------------------------------------
-
@Override
- public Object queryNextChunkMax(
+ protected Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column column,
@@ -161,199 +59,8 @@ public class Db2ChunkSplitter implements
JdbcSourceChunkSplitter {
}
@Override
- public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
- return Db2Utils.queryApproximateRowCnt(jdbc, tableId);
- }
-
- @Override
- public String buildSplitScanQuery(
- TableId tableId, RowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
- return Db2Utils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
- }
-
- /**
- * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
- * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
- * many queries and is not efficient.
- */
- private List<ChunkRange> splitTableIntoChunks(
- JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
- final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
- final Object min = minMax[0];
- final Object max = minMax[1];
- if (min == null || max == null || min.equals(max)) {
- // empty table, or only one row, return full table scan as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final int chunkSize = sourceConfig.getSplitSize();
- final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
- final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
-
- if (isEvenlySplitColumn(splitColumn)) {
- long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
- double distributionFactor =
- calculateDistributionFactor(tableId, min, max,
approximateRowCnt);
-
- boolean dataIsEvenlyDistributed =
- doubleCompare(distributionFactor, distributionFactorLower)
>= 0
- && doubleCompare(distributionFactor,
distributionFactorUpper) <= 0;
-
- if (dataIsEvenlyDistributed) {
- // the minimum dynamic chunk size is at least 1
- final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
- return splitEvenlySizedChunks(
- tableId, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn,
min, max, chunkSize);
- }
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min,
max, chunkSize);
- }
- }
-
- /**
- * Split table into evenly sized chunks based on the numeric min and max
value of split column,
- * and tumble chunks in step size.
- */
- private List<ChunkRange> splitEvenlySizedChunks(
- TableId tableId,
- Object min,
- Object max,
- long approximateRowCnt,
- int chunkSize,
- int dynamicChunkSize) {
- LOG.info(
- "Use evenly-sized chunk optimization for table {}, the
approximate row count is {}, the chunk size is {}, the dynamic chunk size is
{}",
- tableId,
- approximateRowCnt,
- chunkSize,
- dynamicChunkSize);
- if (approximateRowCnt <= chunkSize) {
- // there is no more than one chunk, return full table as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
- while (ObjectUtils.compare(chunkEnd, max) <= 0) {
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- chunkStart = chunkEnd;
- try {
- chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
- } catch (ArithmeticException e) {
- // Stop chunk split to avoid dead loop when number overflows.
- break;
- }
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- //
------------------------------------------------------------------------------------------
-
- /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
- private List<ChunkRange> splitUnevenlySizedChunks(
- JdbcConnection jdbc,
- TableId tableId,
- Column splitColumn,
- Object min,
- Object max,
- int chunkSize)
+ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
- LOG.info(
- "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
- int count = 0;
- while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
- // we start from [null, min + chunk_size) and avoid [null, min)
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- // may sleep awhile to avoid DDOS on Db2 server
- maySleep(count++, tableId);
- chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max,
chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- private Object nextChunkEnd(
- JdbcConnection jdbc,
- Object previousChunkEnd,
- TableId tableId,
- Column splitColumn,
- Object max,
- int chunkSize)
- throws SQLException {
- // chunk end might be null when max values are removed
- Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize,
previousChunkEnd);
- if (Objects.equals(previousChunkEnd, chunkEnd)) {
- // we don't allow equal chunk start and end,
- // should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
- }
- if (ObjectUtils.compare(chunkEnd, max) >= 0) {
- return null;
- } else {
- return chunkEnd;
- }
- }
-
- private SnapshotSplit createSnapshotSplit(
- JdbcConnection jdbc,
- TableId tableId,
- int chunkId,
- RowType splitKeyType,
- Object chunkStart,
- Object chunkEnd) {
- // currently, we only support single split column
- Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
- Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
- Map<TableId, TableChange> schema = new HashMap<>();
- schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
- return new SnapshotSplit(
- tableId,
- splitId(tableId, chunkId),
- splitKeyType,
- splitStart,
- splitEnd,
- null,
- schema);
- }
-
- /** Returns the distribution factor of the given table. */
- private double calculateDistributionFactor(
- TableId tableId, Object min, Object max, long approximateRowCnt) {
-
- if (!min.getClass().equals(max.getClass())) {
- throw new IllegalStateException(
- String.format(
- "Unsupported operation type, the MIN value type %s
is different with MAX value type %s.",
- min.getClass().getSimpleName(),
max.getClass().getSimpleName()));
- }
- if (approximateRowCnt == 0) {
- return Double.MAX_VALUE;
- }
- BigDecimal difference = ObjectUtils.minus(max, min);
- // factor = (max - min + 1) / rowCount
- final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
- double distributionFactor =
- subRowCnt
- .divide(new BigDecimal(approximateRowCnt), 4,
RoundingMode.CEILING)
- .doubleValue();
- LOG.info(
- "The distribution factor of table {} is {} according to the
min split key {}, max split key {} and approximate row count {}",
- tableId,
- distributionFactor,
- min,
- max,
- approximateRowCnt);
- return distributionFactor;
+ return Db2Utils.queryApproximateRowCnt(jdbc, tableId);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java
index 06c4e741f..2d08d7bff 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java
@@ -49,7 +49,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
@@ -60,26 +59,6 @@ public class Db2Utils {
public Db2Utils() {}
- public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
String columnName)
- throws SQLException {
- final String minMaxQuery =
- String.format(
- "SELECT MIN(%s), MAX(%s) FROM %s",
- quote(columnName), quote(columnName),
quoteSchemaAndTable(tableId));
- return jdbc.queryAndMap(
- minMaxQuery,
- rs -> {
- if (!rs.next()) {
- // this should never happen
- throw new SQLException(
- String.format(
- "No result returned after running
query [%s]",
- minMaxQuery));
- }
- return rowToArray(rs, 2);
- });
- }
-
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId
tableId)
throws SQLException {
// The statement used to get approximate row count which is less
@@ -103,27 +82,6 @@ public class Db2Utils {
});
}
- public static Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
- throws SQLException {
- final String minQuery =
- String.format(
- "SELECT MIN(%s) FROM %s WHERE %s > ?",
- columnName, quote(tableId), columnName);
- return jdbc.prepareQueryAndMap(
- minQuery,
- ps -> ps.setObject(1, excludedLowerBound),
- rs -> {
- if (!rs.next()) {
- // this should never happen
- throw new SQLException(
- String.format(
- "No result returned after running
query [%s]", minQuery));
- }
- return rs.getObject(1);
- });
- }
-
/**
* Returns the next LSN to be read from the database. This is the LSN of
the last record that
* was read from the database.
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
index d68f1cd99..adf4c98a9 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
@@ -17,253 +17,92 @@
package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter;
+import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
-import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleTypeUtils;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleUtils;
import org.apache.flink.cdc.connectors.oracle.util.ChunkUtils;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
import oracle.sql.ROWID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import javax.annotation.Nullable;
-import static
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
+import java.sql.SQLException;
/**
* The {@code ChunkSplitter} used to split Oracle table into a set of chunks
for JDBC data source.
*/
-public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
-
- private static final Logger LOG =
LoggerFactory.getLogger(OracleChunkSplitter.class);
-
- private final JdbcSourceConfig sourceConfig;
- private final JdbcDataSourceDialect dialect;
+@Internal
+public class OracleChunkSplitter extends JdbcSourceChunkSplitter {
public OracleChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
- this.sourceConfig = sourceConfig;
- this.dialect = dialect;
- }
-
- @Override
- public Collection<SnapshotSplit> generateSplits(TableId tableId) {
- try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
- LOG.info("Start splitting table {} into chunks...", tableId);
- long start = System.currentTimeMillis();
-
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- Column splitColumn =
- ChunkUtils.getChunkKeyColumn(table,
sourceConfig.getChunkKeyColumn());
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new FlinkRuntimeException("Failed to split chunks for
table " + tableId, e);
- }
-
- // convert chunks into splits
- List<SnapshotSplit> splits = new ArrayList<>();
- RowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
- }
-
- long end = System.currentTimeMillis();
- LOG.info(
- "Split table {} into {} chunks, time cost: {}ms.",
- tableId,
- splits.size(),
- end - start);
- return splits;
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format("Generate Splits for table %s error",
tableId), e);
- }
+ super(sourceConfig, dialect);
}
@Override
- public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column
column)
+ public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column
splitColumn)
throws SQLException {
- return OracleUtils.queryMinMax(jdbc, tableId, column.name());
+ // oracle query only use schema and table.
+ String quoteSchemaAndTable = OracleUtils.quoteSchemaAndTable(tableId);
+ return JdbcChunkUtils.queryMinMax(
+ jdbc, quoteSchemaAndTable,
jdbc.quotedColumnIdString(splitColumn.name()));
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object
excludedLowerBound)
throws SQLException {
- return OracleUtils.queryMin(jdbc, tableId, column.name(),
excludedLowerBound);
+ // oracle query only use schema and table.
+ String quoteSchemaAndTable = OracleUtils.quoteSchemaAndTable(tableId);
+ return JdbcChunkUtils.queryMin(
+ jdbc, quoteSchemaAndTable, jdbc.quotedTableIdString(tableId),
excludedLowerBound);
}
@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
- Column column,
+ Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return OracleUtils.queryNextChunkMax(
- jdbc, tableId, column.name(), chunkSize, includedLowerBound);
+ jdbc, tableId, splitColumn.name(), chunkSize,
includedLowerBound);
}
@Override
- public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
+ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
+ throws SQLException {
return OracleUtils.queryApproximateRowCnt(jdbc, tableId);
}
- @Override
- public String buildSplitScanQuery(
- TableId tableId, RowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
- return OracleUtils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
- }
-
@Override
public DataType fromDbzColumn(Column splitColumn) {
return OracleTypeUtils.fromDbzColumn(splitColumn);
}
- //
--------------------------------------------------------------------------------------------
- // Utilities
- //
--------------------------------------------------------------------------------------------
-
- /**
- * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
- * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
- * many queries and is not efficient.
- */
- private List<ChunkRange> splitTableIntoChunks(
- JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
- final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
- final Object min = minMax[0];
- final Object max = minMax[1];
- if (min == null || max == null || min.equals(max)) {
- // empty table, or only one row, return full table scan as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final int chunkSize = sourceConfig.getSplitSize();
- final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
- final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
-
+ @Override
+ protected boolean isEvenlySplitColumn(Column splitColumn) {
// use ROWID get splitUnevenlySizedChunks by default
if (splitColumn.name().equals(ROWID.class.getSimpleName())) {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min,
max, chunkSize);
- }
-
- if (isEvenlySplitColumn(splitColumn)) {
- long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
- double distributionFactor =
- calculateDistributionFactor(tableId, min, max,
approximateRowCnt);
-
- boolean dataIsEvenlyDistributed =
- doubleCompare(distributionFactor, distributionFactorLower)
>= 0
- && doubleCompare(distributionFactor,
distributionFactorUpper) <= 0;
-
- if (dataIsEvenlyDistributed) {
- // the minimum dynamic chunk size is at least 1
- final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
- return splitEvenlySizedChunks(
- tableId, min, max, approximateRowCnt,
dynamicChunkSize);
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn,
min, max, chunkSize);
- }
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min,
max, chunkSize);
- }
- }
-
- /**
- * Split table into evenly sized chunks based on the numeric min and max
value of split column,
- * and tumble chunks in step size.
- */
- private List<ChunkRange> splitEvenlySizedChunks(
- TableId tableId, Object min, Object max, long approximateRowCnt,
int chunkSize) {
- LOG.info(
- "Use evenly-sized chunk optimization for table {}, the
approximate row count is {}, the chunk size is {}",
- tableId,
- approximateRowCnt,
- chunkSize);
- if (approximateRowCnt <= chunkSize) {
- // there is no more than one chunk, return full table as a chunk
- return Collections.singletonList(ChunkRange.all());
+ return false;
}
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = ObjectUtils.plus(min, chunkSize);
- while (ObjectUtils.compare(chunkEnd, max) <= 0) {
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- chunkStart = chunkEnd;
- chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
- private List<ChunkRange> splitUnevenlySizedChunks(
- JdbcConnection jdbc,
- TableId tableId,
- Column splitColumn,
- Object min,
- Object max,
- int chunkSize)
- throws SQLException {
- LOG.info(
- "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
- int count = 0;
-
- while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
- // we start from [null, min + chunk_size) and avoid [null, min)
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- // may sleep a while to avoid DDOS on MySQL server
- maySleep(count++, tableId);
- chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max,
chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
+ return super.isEvenlySplitColumn(splitColumn);
}
/** ChunkEnd less than or equal to max. */
- private boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+ @Override
+ protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
@@ -276,7 +115,8 @@ public class OracleChunkSplitter implements
JdbcSourceChunkSplitter {
}
/** ChunkEnd greater than or equal to max. */
- private boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+ @Override
+ protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
@@ -288,95 +128,9 @@ public class OracleChunkSplitter implements
JdbcSourceChunkSplitter {
return chunkEndMaxCompare;
}
- private Object nextChunkEnd(
- JdbcConnection jdbc,
- Object previousChunkEnd,
- TableId tableId,
- Column splitColumn,
- Object max,
- int chunkSize)
- throws SQLException {
- // chunk end might be null when max values are removed
- Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize,
previousChunkEnd);
- if (Objects.equals(previousChunkEnd, chunkEnd)) {
- // we don't allow equal chunk start and end,
- // should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
- }
- if (isChunkEndGeMax(chunkEnd, max)) {
- return null;
- } else {
- return chunkEnd;
- }
- }
-
- private SnapshotSplit createSnapshotSplit(
- JdbcConnection jdbc,
- TableId tableId,
- int chunkId,
- RowType splitKeyType,
- Object chunkStart,
- Object chunkEnd) {
- // currently, we only support single split column
- Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
- Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
- Map<TableId, TableChange> schema = new HashMap<>();
- schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
- return new SnapshotSplit(
- tableId,
- splitId(tableId, chunkId),
- splitKeyType,
- splitStart,
- splitEnd,
- null,
- schema);
- }
-
- //
------------------------------------------------------------------------------------------
- /** Returns the distribution factor of the given table. */
- private double calculateDistributionFactor(
- TableId tableId, Object min, Object max, long approximateRowCnt) {
-
- if (!min.getClass().equals(max.getClass())) {
- throw new IllegalStateException(
- String.format(
- "Unsupported operation type, the MIN value type %s
is different with MAX value type %s.",
- min.getClass().getSimpleName(),
max.getClass().getSimpleName()));
- }
- if (approximateRowCnt == 0) {
- return Double.MAX_VALUE;
- }
- BigDecimal difference = ObjectUtils.minus(max, min);
- // factor = (max - min + 1) / rowCount
- final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
- double distributionFactor =
- subRowCnt
- .divide(new BigDecimal(approximateRowCnt), 4,
RoundingMode.CEILING)
- .doubleValue();
- LOG.info(
- "The distribution factor of table {} is {} according to the
min split key {}, max split key {} and approximate row count {}",
- tableId,
- distributionFactor,
- min,
- max,
- approximateRowCnt);
- return distributionFactor;
- }
-
- private static String splitId(TableId tableId, int chunkId) {
- return tableId.toString() + ":" + chunkId;
- }
-
- private static void maySleep(int count, TableId tableId) {
- // every 10 queries to sleep 0.1s
- if (count % 10 == 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // nothing to do
- }
- LOG.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
- }
+ @Override
+ protected Column getSplitColumn(Table table, @Nullable String
chunkKeyColumn) {
+ // Use the ROWID column as the chunk key column by default for oracle
cdc connector
+ return ChunkUtils.getChunkKeyColumn(table, chunkKeyColumn);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java
index b762a974b..4f3508744 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java
@@ -42,33 +42,11 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
-
/** Utils to prepare Oracle SQL statement. */
public class OracleUtils {
private OracleUtils() {}
- public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
String columnName)
- throws SQLException {
- final String minMaxQuery =
- String.format(
- "SELECT MIN(%s), MAX(%s) FROM %s",
- quote(columnName), quote(columnName),
quoteSchemaAndTable(tableId));
- return jdbc.queryAndMap(
- minMaxQuery,
- rs -> {
- if (!rs.next()) {
- // this should never happen
- throw new SQLException(
- String.format(
- "No result returned after running
query [%s]",
- minMaxQuery));
- }
- return rowToArray(rs, 2);
- });
- }
-
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId
tableId)
throws SQLException {
final String analyzeTable =
@@ -92,27 +70,6 @@ public class OracleUtils {
});
}
- public static Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
- throws SQLException {
- final String minQuery =
- String.format(
- "SELECT MIN(%s) FROM %s WHERE %s > ?",
- quote(columnName), quoteSchemaAndTable(tableId),
quote(columnName));
- return jdbc.prepareQueryAndMap(
- minQuery,
- ps -> ps.setObject(1, excludedLowerBound),
- rs -> {
- if (!rs.next()) {
- // this should never happen
- throw new SQLException(
- String.format(
- "No result returned after running
query [%s]", minQuery));
- }
- return rs.getObject(1);
- });
- }
-
public static Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java
index 96a4cc18d..9639b28f8 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java
@@ -17,138 +17,54 @@
package org.apache.flink.cdc.connectors.postgres.source;
+import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
-import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
-import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
-import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
import
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static java.math.BigDecimal.ROUND_CEILING;
-import static
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
/**
* The splitter to split the table into chunks using primary-key (by default)
or a given split key.
*/
-public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
- private static final Logger LOG =
LoggerFactory.getLogger(PostgresChunkSplitter.class);
-
- private final JdbcSourceConfig sourceConfig;
- private final PostgresDialect dialect;
+@Internal
+public class PostgresChunkSplitter extends JdbcSourceChunkSplitter {
public PostgresChunkSplitter(JdbcSourceConfig sourceConfig,
PostgresDialect postgresDialect) {
- this.sourceConfig = sourceConfig;
- this.dialect = postgresDialect;
- }
-
- private static String splitId(TableId tableId, int chunkId) {
- return tableId.toString() + ":" + chunkId;
- }
-
- private static void maySleep(int count, TableId tableId) {
- // every 10 queries to sleep 100ms
- if (count % 10 == 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // nothing to do
- }
- LOG.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
- }
+ super(sourceConfig, postgresDialect);
}
@Override
- public Collection<SnapshotSplit> generateSplits(TableId tableId) {
- try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
- LOG.info("Start splitting table {} into chunks...", tableId);
- long start = System.currentTimeMillis();
-
- Table table =
- Objects.requireNonNull(dialect.queryTableSchema(jdbc,
tableId)).getTable();
- Column splitColumn = ChunkUtils.getSplitColumn(table,
sourceConfig.getChunkKeyColumn());
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new FlinkRuntimeException("Failed to split chunks for
table " + tableId, e);
- }
-
- // convert chunks into splits
- List<SnapshotSplit> splits = new ArrayList<>();
- RowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
- }
-
- long end = System.currentTimeMillis();
- LOG.info(
- "Split table {} into {} chunks, time cost: {}ms.",
- tableId,
- splits.size(),
- end - start);
- return splits;
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format("Generate Splits for table %s error",
tableId), e);
- }
+ public Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ Column splitColumn,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ return PostgresQueryUtils.queryNextChunkMax(
+ jdbc, tableId, splitColumn, chunkSize, includedLowerBound);
}
+ /** Postgres chunk split overrides queryMin method to query based on uuid.
*/
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column
splitColumn)
throws SQLException {
return PostgresQueryUtils.queryMinMax(jdbc, tableId, splitColumn);
}
+ /** Postgres chunk split overrides queryMin method to query based on uuid.
*/
@Override
public Object queryMin(
- JdbcConnection jdbc, TableId tableId, Column column, Object
excludedLowerBound)
- throws SQLException {
- return PostgresQueryUtils.queryMin(jdbc, tableId, column,
excludedLowerBound);
- }
-
- @Override
- public Object queryNextChunkMax(
- JdbcConnection jdbc,
- TableId tableId,
- Column column,
- int chunkSize,
- Object includedLowerBound)
+ JdbcConnection jdbc, TableId tableId, Column splitColumn, Object
excludedLowerBound)
throws SQLException {
- return PostgresQueryUtils.queryNextChunkMax(
- jdbc, tableId, column, chunkSize, includedLowerBound);
+ return PostgresQueryUtils.queryMin(jdbc, tableId, splitColumn,
excludedLowerBound);
}
//
--------------------------------------------------------------------------------------------
@@ -156,203 +72,13 @@ public class PostgresChunkSplitter implements
JdbcSourceChunkSplitter {
//
--------------------------------------------------------------------------------------------
@Override
- public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
+ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
+ throws SQLException {
return PostgresQueryUtils.queryApproximateRowCnt(jdbc, tableId);
}
@Override
- public String buildSplitScanQuery(
- TableId tableId, RowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
- return PostgresQueryUtils.buildSplitScanQuery(
- tableId, splitKeyType, isFirstSplit, isLastSplit);
- }
-
- @Override
- public DataType fromDbzColumn(Column splitColumn) {
+ protected DataType fromDbzColumn(Column splitColumn) {
return PostgresTypeUtils.fromDbzColumn(splitColumn);
}
-
- /**
- * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
- * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
- * many queries and is not efficient.
- */
- private List<ChunkRange> splitTableIntoChunks(
- JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
- final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
- final Object min = minMax[0];
- final Object max = minMax[1];
- if (min == null || max == null || min.equals(max)) {
- // empty table, or only one row, return full table scan as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final int chunkSize = sourceConfig.getSplitSize();
- final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
- final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
-
- if (isEvenlySplitColumn(splitColumn)) {
- long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
- double distributionFactor =
- calculateDistributionFactor(tableId, min, max,
approximateRowCnt);
-
- boolean dataIsEvenlyDistributed =
- doubleCompare(distributionFactor, distributionFactorLower)
>= 0
- && doubleCompare(distributionFactor,
distributionFactorUpper) <= 0;
-
- if (dataIsEvenlyDistributed) {
- // the minimum dynamic chunk size is at least 1
- final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
- return splitEvenlySizedChunks(
- tableId, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn,
min, max, chunkSize);
- }
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min,
max, chunkSize);
- }
- }
-
- /**
- * Split table into evenly sized chunks based on the numeric min and max
value of split column,
- * and tumble chunks in step size.
- */
- private List<ChunkRange> splitEvenlySizedChunks(
- TableId tableId,
- Object min,
- Object max,
- long approximateRowCnt,
- int chunkSize,
- int dynamicChunkSize) {
- LOG.info(
- "Use evenly-sized chunk optimization for table {}, the
approximate row count is {}, the chunk size is {}, the dynamic chunk size is
{}",
- tableId,
- approximateRowCnt,
- chunkSize,
- dynamicChunkSize);
- if (approximateRowCnt <= chunkSize) {
- // there is no more than one chunk, return full table as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
- while (ObjectUtils.compare(chunkEnd, max) <= 0) {
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- chunkStart = chunkEnd;
- try {
- chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
- } catch (ArithmeticException e) {
- // Stop chunk split to avoid dead loop when number overflows.
- break;
- }
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- //
------------------------------------------------------------------------------------------
-
- /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
- private List<ChunkRange> splitUnevenlySizedChunks(
- JdbcConnection jdbc,
- TableId tableId,
- Column splitColumn,
- Object min,
- Object max,
- int chunkSize)
- throws SQLException {
- LOG.info(
- "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
- int count = 0;
- while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
- // we start from [null, min + chunk_size) and avoid [null, min)
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- // may sleep a while to avoid DDOS on PostgreSQL server
- maySleep(count++, tableId);
- chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max,
chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- private Object nextChunkEnd(
- JdbcConnection jdbc,
- Object previousChunkEnd,
- TableId tableId,
- Column splitColumn,
- Object max,
- int chunkSize)
- throws SQLException {
- // chunk end might be null when max values are removed
- Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize,
previousChunkEnd);
- if (Objects.equals(previousChunkEnd, chunkEnd)) {
- // we don't allow equal chunk start and end,
- // should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
- }
- if (ObjectUtils.compare(chunkEnd, max) >= 0) {
- return null;
- } else {
- return chunkEnd;
- }
- }
-
- private SnapshotSplit createSnapshotSplit(
- JdbcConnection jdbc,
- TableId tableId,
- int chunkId,
- RowType splitKeyType,
- Object chunkStart,
- Object chunkEnd) {
- // currently, we only support single split column
- Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
- Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
- Map<TableId, TableChange> schema = new HashMap<>();
- schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
- return new SnapshotSplit(
- tableId,
- splitId(tableId, chunkId),
- splitKeyType,
- splitStart,
- splitEnd,
- null,
- schema);
- }
-
- /** Returns the distribution factor of the given table. */
- private double calculateDistributionFactor(
- TableId tableId, Object min, Object max, long approximateRowCnt) {
-
- if (!min.getClass().equals(max.getClass())) {
- throw new IllegalStateException(
- String.format(
- "Unsupported operation type, the MIN value type %s
is different with MAX value type %s.",
- min.getClass().getSimpleName(),
max.getClass().getSimpleName()));
- }
- if (approximateRowCnt == 0) {
- return Double.MAX_VALUE;
- }
- BigDecimal difference = ObjectUtils.minus(max, min);
- // factor = (max - min + 1) / rowCount
- final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
- double distributionFactor =
- subRowCnt.divide(new BigDecimal(approximateRowCnt), 4,
ROUND_CEILING).doubleValue();
- LOG.info(
- "The distribution factor of table {} is {} according to the
min split key {}, max split key {} and approximate row count {}",
- tableId,
- distributionFactor,
- min,
- max,
- approximateRowCnt);
- return distributionFactor;
- }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
index 3e8a1feb6..e7408be34 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -146,12 +145,6 @@ public class PostgresQueryUtils {
});
}
- public static String buildSplitScanQuery(
- TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean
isLastSplit) {
- return buildSplitScanQuery(
- tableId, pkRowType, isFirstSplit, isLastSplit, new
ArrayList<>());
- }
-
public static String buildSplitScanQuery(
TableId tableId,
RowType pkRowType,
@@ -276,7 +269,7 @@ public class PostgresQueryUtils {
return String.format("(%s)::text", value);
}
- private static boolean isUUID(Column column) {
+ public static boolean isUUID(Column column) {
return column.typeName().equals("uuid");
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
index 395467175..fb338a0ea 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
@@ -17,345 +17,51 @@
package org.apache.flink.cdc.connectors.sqlserver.source.dialect;
+import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
-import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import
org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
-import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
-import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import
org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils;
import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerUtils;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
/**
* The {@code ChunkSplitter} used to split SqlServer table into a set of
chunks for JDBC data
* source.
*/
-public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
-
- private static final Logger LOG =
LoggerFactory.getLogger(SqlServerChunkSplitter.class);
-
- private final JdbcSourceConfig sourceConfig;
- private final JdbcDataSourceDialect dialect;
+@Internal
+public class SqlServerChunkSplitter extends JdbcSourceChunkSplitter {
public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
- this.sourceConfig = sourceConfig;
- this.dialect = dialect;
- }
-
- private static String splitId(TableId tableId, int chunkId) {
- return tableId.toString() + ":" + chunkId;
- }
-
- private static void maySleep(int count, TableId tableId) {
- // every 100 queries to sleep 1s
- if (count % 10 == 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // nothing to do
- }
- LOG.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
- }
- }
-
- @Override
- public Collection<SnapshotSplit> generateSplits(TableId tableId) {
- try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
- LOG.info("Start splitting table {} into chunks...", tableId);
- long start = System.currentTimeMillis();
-
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- Column splitColumn =
- SqlServerUtils.getSplitColumn(table,
sourceConfig.getChunkKeyColumn());
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new FlinkRuntimeException("Failed to split chunks for
table " + tableId, e);
- }
-
- // convert chunks into splits
- List<SnapshotSplit> splits = new ArrayList<>();
- RowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
- }
-
- long end = System.currentTimeMillis();
- LOG.info(
- "Split table {} into {} chunks, time cost: {}ms.",
- tableId,
- splits.size(),
- end - start);
- return splits;
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format("Generate Splits for table %s error",
tableId), e);
- }
- }
-
- @Override
- public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column
column)
- throws SQLException {
- return SqlServerUtils.queryMinMax(jdbc, tableId, column.name());
- }
-
- @Override
- public Object queryMin(
- JdbcConnection jdbc, TableId tableId, Column column, Object
excludedLowerBound)
- throws SQLException {
- return SqlServerUtils.queryMin(jdbc, tableId, column.name(),
excludedLowerBound);
+ super(sourceConfig, dialect);
}
@Override
- public DataType fromDbzColumn(Column splitColumn) {
+ protected DataType fromDbzColumn(Column splitColumn) {
return SqlServerTypeUtils.fromDbzColumn(splitColumn);
}
- //
--------------------------------------------------------------------------------------------
- // Utilities
- //
--------------------------------------------------------------------------------------------
-
@Override
- public Object queryNextChunkMax(
+ protected Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
- Column column,
+ Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return SqlServerUtils.queryNextChunkMax(
- jdbc, tableId, column.name(), chunkSize, includedLowerBound);
+ jdbc, tableId, splitColumn.name(), chunkSize,
includedLowerBound);
}
@Override
- public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
- return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
- }
-
- @Override
- public String buildSplitScanQuery(
- TableId tableId, RowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
- return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
- }
-
- /**
- * We can use evenly-sized chunks or unevenly-sized chunks when split
table into chunks, using
- * evenly-sized chunks which is much efficient, using unevenly-sized
chunks which will request
- * many queries and is not efficient.
- */
- private List<ChunkRange> splitTableIntoChunks(
- JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
- final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
- final Object min = minMax[0];
- final Object max = minMax[1];
- if (min == null || max == null || min.equals(max)) {
- // empty table, or only one row, return full table scan as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final int chunkSize = sourceConfig.getSplitSize();
- final double distributionFactorUpper =
sourceConfig.getDistributionFactorUpper();
- final double distributionFactorLower =
sourceConfig.getDistributionFactorLower();
-
- if (isEvenlySplitColumn(splitColumn)) {
- long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
- double distributionFactor =
- calculateDistributionFactor(tableId, min, max,
approximateRowCnt);
-
- boolean dataIsEvenlyDistributed =
- doubleCompare(distributionFactor, distributionFactorLower)
>= 0
- && doubleCompare(distributionFactor,
distributionFactorUpper) <= 0;
-
- if (dataIsEvenlyDistributed) {
- // the minimum dynamic chunk size is at least 1
- final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
- return splitEvenlySizedChunks(
- tableId, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn,
min, max, chunkSize);
- }
- } else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min,
max, chunkSize);
- }
- }
-
- /**
- * Split table into evenly sized chunks based on the numeric min and max
value of split column,
- * and tumble chunks in step size.
- */
- private List<ChunkRange> splitEvenlySizedChunks(
- TableId tableId,
- Object min,
- Object max,
- long approximateRowCnt,
- int chunkSize,
- int dynamicChunkSize) {
- LOG.info(
- "Use evenly-sized chunk optimization for table {}, the
approximate row count is {}, the chunk size is {}, the dynamic chunk size is
{}",
- tableId,
- approximateRowCnt,
- chunkSize,
- dynamicChunkSize);
- if (approximateRowCnt <= chunkSize) {
- // there is no more than one chunk, return full table as a chunk
- return Collections.singletonList(ChunkRange.all());
- }
-
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
- while (ObjectUtils.compare(chunkEnd, max) <= 0) {
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- chunkStart = chunkEnd;
- try {
- chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
- } catch (ArithmeticException e) {
- // Stop chunk split to avoid dead loop when number overflows.
- break;
- }
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- //
------------------------------------------------------------------------------------------
-
- /** Split table into unevenly sized chunks by continuously calculating
next chunk max value. */
- private List<ChunkRange> splitUnevenlySizedChunks(
- JdbcConnection jdbc,
- TableId tableId,
- Column splitColumn,
- Object min,
- Object max,
- int chunkSize)
+ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
- LOG.info(
- "Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
- final List<ChunkRange> splits = new ArrayList<>();
- Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
- int count = 0;
- while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
- // we start from [null, min + chunk_size) and avoid [null, min)
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
- // may sleep awhile to avoid DDOS on SqlServer server
- maySleep(count++, tableId);
- chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max,
chunkSize);
- }
- // add the ending split
- splits.add(ChunkRange.of(chunkStart, null));
- return splits;
- }
-
- private Object nextChunkEnd(
- JdbcConnection jdbc,
- Object previousChunkEnd,
- TableId tableId,
- Column splitColumn,
- Object max,
- int chunkSize)
- throws SQLException {
- // chunk end might be null when max values are removed
- Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize,
previousChunkEnd);
- if (Objects.equals(previousChunkEnd, chunkEnd)) {
- // we don't allow equal chunk start and end,
- // should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
- }
- if (ObjectUtils.compare(chunkEnd, max) >= 0) {
- return null;
- } else {
- return chunkEnd;
- }
- }
-
- private SnapshotSplit createSnapshotSplit(
- JdbcConnection jdbc,
- TableId tableId,
- int chunkId,
- RowType splitKeyType,
- Object chunkStart,
- Object chunkEnd) {
- // currently, we only support single split column
- Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
- Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
- Map<TableId, TableChange> schema = new HashMap<>();
- schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
- return new SnapshotSplit(
- tableId,
- splitId(tableId, chunkId),
- splitKeyType,
- splitStart,
- splitEnd,
- null,
- schema);
- }
-
- /** Returns the distribution factor of the given table. */
- private double calculateDistributionFactor(
- TableId tableId, Object min, Object max, long approximateRowCnt) {
-
- if (!min.getClass().equals(max.getClass())) {
- throw new IllegalStateException(
- String.format(
- "Unsupported operation type, the MIN value type %s
is different with MAX value type %s.",
- min.getClass().getSimpleName(),
max.getClass().getSimpleName()));
- }
- if (approximateRowCnt == 0) {
- return Double.MAX_VALUE;
- }
- BigDecimal difference = ObjectUtils.minus(max, min);
- // factor = (max - min + 1) / rowCount
- final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
- double distributionFactor =
- subRowCnt
- .divide(new BigDecimal(approximateRowCnt), 4,
RoundingMode.CEILING)
- .doubleValue();
- LOG.info(
- "The distribution factor of table {} is {} according to the
min split key {}, max split key {} and approximate row count {}",
- tableId,
- distributionFactor,
- min,
- max,
- approximateRowCnt);
- return distributionFactor;
+ return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
index 711f42f8a..b2292a825 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
@@ -50,7 +50,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
@@ -59,26 +58,6 @@ public class SqlServerUtils {
public SqlServerUtils() {}
- public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
String columnName)
- throws SQLException {
- final String minMaxQuery =
- String.format(
- "SELECT MIN(%s), MAX(%s) FROM %s",
- quote(columnName), quote(columnName), quote(tableId));
- return jdbc.queryAndMap(
- minMaxQuery,
- rs -> {
- if (!rs.next()) {
- // this should never happen
- throw new SQLException(
- String.format(
- "No result returned after running
query [%s]",
- minMaxQuery));
- }
- return rowToArray(rs, 2);
- });
- }
-
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId
tableId)
throws SQLException {
// The statement used to get approximate row count which is less
@@ -103,27 +82,6 @@ public class SqlServerUtils {
});
}
- public static Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
- throws SQLException {
- final String minQuery =
- String.format(
- "SELECT MIN(%s) FROM %s WHERE %s > ?",
- quote(columnName), quote(tableId), quote(columnName));
- return jdbc.prepareQueryAndMap(
- minQuery,
- ps -> ps.setObject(1, excludedLowerBound),
- rs -> {
- if (!rs.next()) {
- // this should never happen
- throw new SQLException(
- String.format(
- "No result returned after running
query [%s]", minQuery));
- }
- return rs.getObject(1);
- });
- }
-
/**
* Returns the next LSN to be read from the database. This is the LSN of
the last record that
* was read from the database.