This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 2729ad02d [FLINK-36793][source-connector/oracle] Fix the problem of
uneven splits caused by Oracle ROWID type comparison
2729ad02d is described below
commit 2729ad02d7a44f5b1d3af414a17324d358d5d3f1
Author: linjianchang <[email protected]>
AuthorDate: Tue Apr 22 20:35:36 2025 +0800
[FLINK-36793][source-connector/oracle] Fix the problem of uneven splits
caused by Oracle ROWID type comparison
This closes #3841
Co-authored-by: linjc13 <[email protected]>
---
.../assigner/splitter/JdbcSourceChunkSplitter.java | 15 ++-
.../assigner/splitter/OracleChunkSplitter.java | 50 ++++++++--
.../assigner/splitter/OracleChunkSplitterTest.java | 101 +++++++++++++++++++++
.../source/dialect/SqlServerChunkSplitter.java | 6 +-
4 files changed, 157 insertions(+), 15 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 09fe4b90d..545db56f8 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -261,12 +261,16 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
}
/** ChunkEnd less than or equal to max. */
- protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column
splitColumn) {
+ protected boolean isChunkEndLeMax(
+ @Nullable JdbcConnection jdbc, Object chunkEnd, Object max, Column
splitColumn)
+ throws SQLException {
return ObjectUtils.compare(chunkEnd, max) <= 0;
}
/** ChunkEnd greater than or equal to max. */
- protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column
splitColumn) {
+ protected boolean isChunkEndGeMax(
+ @Nullable JdbcConnection jdbc, Object chunkEnd, Object max, Column
splitColumn)
+ throws SQLException {
return ObjectUtils.compare(chunkEnd, max) >= 0;
}
@@ -389,7 +393,8 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
chunkSize);
// may sleep a while to avoid DDOS on MySQL server
maySleep(nextChunkId, tableId);
- if (chunkEnd != null && isChunkEndLeMax(chunkEnd,
minMaxOfSplitColumn[1], splitColumn)) {
+ if (chunkEnd != null
+ && isChunkEndLeMax(jdbcConnection, chunkEnd,
minMaxOfSplitColumn[1], splitColumn)) {
nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
return createSnapshotSplit(tableId, nextChunkId++, splitType,
chunkStartVal, chunkEnd);
} else {
@@ -489,7 +494,7 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
int count = 0;
- while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max,
splitColumn)) {
+ while (chunkEnd != null && isChunkEndLeMax(jdbcConnection, chunkEnd,
max, splitColumn)) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on PostgreSQL server
@@ -518,7 +523,7 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
- if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
+ if (isChunkEndGeMax(jdbc, chunkEnd, max, splitColumn)) {
return null;
} else {
return chunkEnd;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
index 0e69cdc52..22f537f51 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
@@ -106,12 +106,29 @@ public class OracleChunkSplitter extends
JdbcSourceChunkSplitter {
/** ChunkEnd less than or equal to max. */
@Override
- protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column
splitColumn) {
+ protected boolean isChunkEndLeMax(
+ JdbcConnection jdbc, Object chunkEnd, Object max, Column
splitColumn)
+ throws SQLException {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
- chunkEndMaxCompare =
- ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID)
max).getBytes())
- <= 0;
+ String query =
+ String.format(
+ "SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION
SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS ASC");
+ return jdbc.prepareQueryAndMap(
+ query,
+ ps -> {
+ ps.setObject(1, chunkEnd.toString());
+ ps.setObject(2, max.toString());
+ },
+ rs -> {
+ if (rs.next()) {
+ Object obj = rs.getObject(1);
+ return obj.toString().equals(chunkEnd.toString())
+ ||
chunkEnd.toString().equals(max.toString());
+ } else {
+ throw new RuntimeException("compare rowid error");
+ }
+ });
} else {
chunkEndMaxCompare = chunkEnd != null &&
ObjectUtils.compare(chunkEnd, max) <= 0;
}
@@ -120,12 +137,29 @@ public class OracleChunkSplitter extends
JdbcSourceChunkSplitter {
/** ChunkEnd greater than or equal to max. */
@Override
- protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column
splitColumn) {
+ protected boolean isChunkEndGeMax(
+ JdbcConnection jdbc, Object chunkEnd, Object max, Column
splitColumn)
+ throws SQLException {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
- chunkEndMaxCompare =
- ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID)
max).getBytes())
- >= 0;
+ String query =
+ String.format(
+ "SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION
SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS DESC");
+ return jdbc.prepareQueryAndMap(
+ query,
+ ps -> {
+ ps.setObject(1, chunkEnd.toString());
+ ps.setObject(2, max.toString());
+ },
+ rs -> {
+ if (rs.next()) {
+ Object obj = rs.getObject(1);
+ return obj.toString().equals(chunkEnd.toString())
+ ||
chunkEnd.toString().equals(max.toString());
+ } else {
+ throw new RuntimeException("compare rowid error");
+ }
+ });
} else {
chunkEndMaxCompare = chunkEnd != null &&
ObjectUtils.compare(chunkEnd, max) >= 0;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java
new file mode 100644
index 000000000..04024f67e
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter;
+
+import
org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
+import org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase;
+import
org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import oracle.sql.ROWID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.SQLException;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT tests to cover tables chunk splitter process. */
+class OracleChunkSplitterTest extends OracleSourceTestBase {
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ @BeforeEach
+ public void before() throws Exception {
+
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(ORACLE_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ TestValuesTableFactory.clearAllData();
+
+ env.setParallelism(1);
+ }
+
+ @Test
+ void testIsChunkEndGeMax_Rowid_Case() throws SQLException {
+ String a = "AAAzIdACKAAABWCAAA";
+ String b = "AAAzIdAC/AACWIPAAB";
+ rowidGeMaxCheck(a, b);
+ }
+
+ @Test
+ void testIsChunkEndLeMax_Rowid_Case() throws SQLException {
+ String a = "AAAzIdACKAAABWCAAA";
+ String b = "AAAzIdAC/AACWIPAAB";
+ rowidLeMaxCheck(a, b);
+ }
+
+ private void rowidGeMaxCheck(String chunkEndStr, String maxStr) throws
SQLException {
+ JdbcConfiguration jdbcConfig =
+ JdbcConfiguration.create()
+ .with(JdbcConfiguration.HOSTNAME,
ORACLE_CONTAINER.getHost())
+ .with(JdbcConfiguration.PORT,
ORACLE_CONTAINER.getOraclePort())
+ .with(JdbcConfiguration.USER,
ORACLE_CONTAINER.getUsername())
+ .with(JdbcConfiguration.PASSWORD,
ORACLE_CONTAINER.getPassword())
+ .with(JdbcConfiguration.DATABASE,
ORACLE_CONTAINER.getDatabaseName())
+ .build();
+ JdbcConnection jdbc =
OracleConnectionUtils.createOracleConnection(jdbcConfig);
+ ROWID chunkEnd = new ROWID(chunkEndStr);
+ ROWID max = new ROWID(maxStr);
+ ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null,
null, null);
+ OracleChunkSplitter splitter = new OracleChunkSplitter(null, null,
chunkSplitterState);
+ assertThat(splitter.isChunkEndGeMax(jdbc, chunkEnd, max,
null)).isFalse();
+ }
+
+ private void rowidLeMaxCheck(String chunkEndStr, String maxStr) throws
SQLException {
+ JdbcConfiguration jdbcConfig =
+ JdbcConfiguration.create()
+ .with(JdbcConfiguration.HOSTNAME,
ORACLE_CONTAINER.getHost())
+ .with(JdbcConfiguration.PORT,
ORACLE_CONTAINER.getOraclePort())
+ .with(JdbcConfiguration.USER,
ORACLE_CONTAINER.getUsername())
+ .with(JdbcConfiguration.PASSWORD,
ORACLE_CONTAINER.getPassword())
+ .with(JdbcConfiguration.DATABASE,
ORACLE_CONTAINER.getDatabaseName())
+ .build();
+ JdbcConnection jdbc =
OracleConnectionUtils.createOracleConnection(jdbcConfig);
+ ROWID chunkEnd = new ROWID(chunkEndStr);
+ ROWID max = new ROWID(maxStr);
+ ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null,
null, null);
+ OracleChunkSplitter splitter = new OracleChunkSplitter(null, null,
chunkSplitterState);
+ assertThat(splitter.isChunkEndLeMax(jdbc, chunkEnd, max,
null)).isTrue();
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
index 1576c052c..7b74c7540 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
@@ -69,12 +69,14 @@ public class SqlServerChunkSplitter extends
JdbcSourceChunkSplitter {
return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}
- protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column
splitColumn) {
+ protected boolean isChunkEndLeMax(
+ JdbcConnection jdbc, Object chunkEnd, Object max, Column
splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
}
/** ChunkEnd greater than or equal to max. */
- protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column
splitColumn) {
+ protected boolean isChunkEndGeMax(
+ JdbcConnection jdbc, Object chunkEnd, Object max, Column
splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
}
}