This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2729ad02d [FLINK-36793][source-connector/oracle] Fix the problem of 
uneven splits caused by Oracle ROWID type comparison
2729ad02d is described below

commit 2729ad02d7a44f5b1d3af414a17324d358d5d3f1
Author: linjianchang <[email protected]>
AuthorDate: Tue Apr 22 20:35:36 2025 +0800

    [FLINK-36793][source-connector/oracle] Fix the problem of uneven splits 
caused by Oracle ROWID type comparison
    
     This closes #3841
    
    Co-authored-by: linjc13 <[email protected]>
---
 .../assigner/splitter/JdbcSourceChunkSplitter.java |  15 ++-
 .../assigner/splitter/OracleChunkSplitter.java     |  50 ++++++++--
 .../assigner/splitter/OracleChunkSplitterTest.java | 101 +++++++++++++++++++++
 .../source/dialect/SqlServerChunkSplitter.java     |   6 +-
 4 files changed, 157 insertions(+), 15 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 09fe4b90d..545db56f8 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -261,12 +261,16 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
     }
 
     /** ChunkEnd less than or equal to max. */
-    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column 
splitColumn) {
+    protected boolean isChunkEndLeMax(
+            @Nullable JdbcConnection jdbc, Object chunkEnd, Object max, Column 
splitColumn)
+            throws SQLException {
         return ObjectUtils.compare(chunkEnd, max) <= 0;
     }
 
     /** ChunkEnd greater than or equal to max. */
-    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column 
splitColumn) {
+    protected boolean isChunkEndGeMax(
+            @Nullable JdbcConnection jdbc, Object chunkEnd, Object max, Column 
splitColumn)
+            throws SQLException {
         return ObjectUtils.compare(chunkEnd, max) >= 0;
     }
 
@@ -389,7 +393,8 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
                         chunkSize);
         // may sleep a while to avoid DDOS on MySQL server
         maySleep(nextChunkId, tableId);
-        if (chunkEnd != null && isChunkEndLeMax(chunkEnd, 
minMaxOfSplitColumn[1], splitColumn)) {
+        if (chunkEnd != null
+                && isChunkEndLeMax(jdbcConnection, chunkEnd, 
minMaxOfSplitColumn[1], splitColumn)) {
             nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
             return createSnapshotSplit(tableId, nextChunkId++, splitType, 
chunkStartVal, chunkEnd);
         } else {
@@ -489,7 +494,7 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
         Object chunkStart = null;
         Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
         int count = 0;
-        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, 
splitColumn)) {
+        while (chunkEnd != null && isChunkEndLeMax(jdbcConnection, chunkEnd, 
max, splitColumn)) {
             // we start from [null, min + chunk_size) and avoid [null, min)
             splits.add(ChunkRange.of(chunkStart, chunkEnd));
             // may sleep a while to avoid DDOS on PostgreSQL server
@@ -518,7 +523,7 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
             // should query the next one larger than chunkEnd
             chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
         }
-        if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
+        if (isChunkEndGeMax(jdbc, chunkEnd, max, splitColumn)) {
             return null;
         } else {
             return chunkEnd;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
index 0e69cdc52..22f537f51 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
@@ -106,12 +106,29 @@ public class OracleChunkSplitter extends 
JdbcSourceChunkSplitter {
 
     /** ChunkEnd less than or equal to max. */
     @Override
-    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column 
splitColumn) {
+    protected boolean isChunkEndLeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column 
splitColumn)
+            throws SQLException {
         boolean chunkEndMaxCompare;
         if (chunkEnd instanceof ROWID && max instanceof ROWID) {
-            chunkEndMaxCompare =
-                    ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID) 
max).getBytes())
-                            <= 0;
+            String query =
+                    String.format(
+                            "SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION 
SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS ASC");
+            return jdbc.prepareQueryAndMap(
+                    query,
+                    ps -> {
+                        ps.setObject(1, chunkEnd.toString());
+                        ps.setObject(2, max.toString());
+                    },
+                    rs -> {
+                        if (rs.next()) {
+                            Object obj = rs.getObject(1);
+                            return obj.toString().equals(chunkEnd.toString())
+                                    || 
chunkEnd.toString().equals(max.toString());
+                        } else {
+                            throw new RuntimeException("compare rowid error");
+                        }
+                    });
         } else {
             chunkEndMaxCompare = chunkEnd != null && 
ObjectUtils.compare(chunkEnd, max) <= 0;
         }
@@ -120,12 +137,29 @@ public class OracleChunkSplitter extends 
JdbcSourceChunkSplitter {
 
     /** ChunkEnd greater than or equal to max. */
     @Override
-    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column 
splitColumn) {
+    protected boolean isChunkEndGeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column 
splitColumn)
+            throws SQLException {
         boolean chunkEndMaxCompare;
         if (chunkEnd instanceof ROWID && max instanceof ROWID) {
-            chunkEndMaxCompare =
-                    ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID) 
max).getBytes())
-                            >= 0;
+            String query =
+                    String.format(
+                            "SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION 
SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS DESC");
+            return jdbc.prepareQueryAndMap(
+                    query,
+                    ps -> {
+                        ps.setObject(1, chunkEnd.toString());
+                        ps.setObject(2, max.toString());
+                    },
+                    rs -> {
+                        if (rs.next()) {
+                            Object obj = rs.getObject(1);
+                            return obj.toString().equals(chunkEnd.toString())
+                                    || 
chunkEnd.toString().equals(max.toString());
+                        } else {
+                            throw new RuntimeException("compare rowid error");
+                        }
+                    });
         } else {
             chunkEndMaxCompare = chunkEnd != null && 
ObjectUtils.compare(chunkEnd, max) >= 0;
         }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java
new file mode 100644
index 000000000..04024f67e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter;
+
+import 
org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
+import org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase;
+import 
org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import oracle.sql.ROWID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.SQLException;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT tests to cover tables chunk splitter process. */
+class OracleChunkSplitterTest extends OracleSourceTestBase {
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    @BeforeEach
+    public void before() throws Exception {
+
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(ORACLE_CONTAINER)).join();
+        LOG.info("Containers are started.");
+        TestValuesTableFactory.clearAllData();
+
+        env.setParallelism(1);
+    }
+
+    @Test
+    void testIsChunkEndGeMax_Rowid_Case() throws SQLException {
+        String a = "AAAzIdACKAAABWCAAA";
+        String b = "AAAzIdAC/AACWIPAAB";
+        rowidGeMaxCheck(a, b);
+    }
+
+    @Test
+    void testIsChunkEndLeMax_Rowid_Case() throws SQLException {
+        String a = "AAAzIdACKAAABWCAAA";
+        String b = "AAAzIdAC/AACWIPAAB";
+        rowidLeMaxCheck(a, b);
+    }
+
+    private void rowidGeMaxCheck(String chunkEndStr, String maxStr) throws 
SQLException {
+        JdbcConfiguration jdbcConfig =
+                JdbcConfiguration.create()
+                        .with(JdbcConfiguration.HOSTNAME, 
ORACLE_CONTAINER.getHost())
+                        .with(JdbcConfiguration.PORT, 
ORACLE_CONTAINER.getOraclePort())
+                        .with(JdbcConfiguration.USER, 
ORACLE_CONTAINER.getUsername())
+                        .with(JdbcConfiguration.PASSWORD, 
ORACLE_CONTAINER.getPassword())
+                        .with(JdbcConfiguration.DATABASE, 
ORACLE_CONTAINER.getDatabaseName())
+                        .build();
+        JdbcConnection jdbc = 
OracleConnectionUtils.createOracleConnection(jdbcConfig);
+        ROWID chunkEnd = new ROWID(chunkEndStr);
+        ROWID max = new ROWID(maxStr);
+        ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null, 
null, null);
+        OracleChunkSplitter splitter = new OracleChunkSplitter(null, null, 
chunkSplitterState);
+        assertThat(splitter.isChunkEndGeMax(jdbc, chunkEnd, max, 
null)).isFalse();
+    }
+
+    private void rowidLeMaxCheck(String chunkEndStr, String maxStr) throws 
SQLException {
+        JdbcConfiguration jdbcConfig =
+                JdbcConfiguration.create()
+                        .with(JdbcConfiguration.HOSTNAME, 
ORACLE_CONTAINER.getHost())
+                        .with(JdbcConfiguration.PORT, 
ORACLE_CONTAINER.getOraclePort())
+                        .with(JdbcConfiguration.USER, 
ORACLE_CONTAINER.getUsername())
+                        .with(JdbcConfiguration.PASSWORD, 
ORACLE_CONTAINER.getPassword())
+                        .with(JdbcConfiguration.DATABASE, 
ORACLE_CONTAINER.getDatabaseName())
+                        .build();
+        JdbcConnection jdbc = 
OracleConnectionUtils.createOracleConnection(jdbcConfig);
+        ROWID chunkEnd = new ROWID(chunkEndStr);
+        ROWID max = new ROWID(maxStr);
+        ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null, 
null, null);
+        OracleChunkSplitter splitter = new OracleChunkSplitter(null, null, 
chunkSplitterState);
+        assertThat(splitter.isChunkEndLeMax(jdbc, chunkEnd, max, 
null)).isTrue();
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
index 1576c052c..7b74c7540 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
@@ -69,12 +69,14 @@ public class SqlServerChunkSplitter extends 
JdbcSourceChunkSplitter {
         return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
     }
 
-    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column 
splitColumn) {
+    protected boolean isChunkEndLeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column 
splitColumn) {
         return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
     }
 
     /** ChunkEnd greater than or equal to max. */
-    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column 
splitColumn) {
+    protected boolean isChunkEndGeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column 
splitColumn) {
         return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
     }
 }

Reply via email to