This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new ea384c8d7 [FLINK-38310] Fix handling equals and hashcode method of 
binary boundaries in FinishedSnapshotSplitInfo (#4112)
ea384c8d7 is described below

commit ea384c8d798f78da676aba37e0913f6ca9d89e39
Author: Sergei Morozov <moro...@tut.by>
AuthorDate: Mon Sep 15 23:10:52 2025 -0700

    [FLINK-38310] Fix handling equals and hashcode method of binary boundaries 
in FinishedSnapshotSplitInfo (#4112)
---
 .../flink-cdc-base/pom.xml                         | 13 +++++
 .../meta/split/FinishedSnapshotSplitInfo.java      | 12 ++---
 .../meta/split/FinishedSnapshotSplitInfoTest.java  | 61 ++++++++++++++++++++++
 .../source/split/FinishedSnapshotSplitInfo.java    | 12 ++---
 .../split/FinishedSnapshotSplitInfoTest.java       | 61 ++++++++++++++++++++++
 5 files changed, 147 insertions(+), 12 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml
index edb1f1249..fb8ba75bd 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml
@@ -27,6 +27,10 @@ limitations under the License.
 
     <artifactId>flink-cdc-base</artifactId>
 
+    <properties>
+        <mockito.version>3.4.6</mockito.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -145,5 +149,14 @@ limitations under the License.
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 </project>
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfo.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfo.java
index 7c3b7ed1d..1f0b37978 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfo.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfo.java
@@ -92,16 +92,16 @@ public class FinishedSnapshotSplitInfo implements 
OffsetDeserializerSerializer {
         FinishedSnapshotSplitInfo that = (FinishedSnapshotSplitInfo) o;
         return Objects.equals(tableId, that.tableId)
                 && Objects.equals(splitId, that.splitId)
-                && Arrays.equals(splitStart, that.splitStart)
-                && Arrays.equals(splitEnd, that.splitEnd)
+                && Arrays.deepEquals(splitStart, that.splitStart)
+                && Arrays.deepEquals(splitEnd, that.splitEnd)
                 && Objects.equals(highWatermark, that.highWatermark);
     }
 
     @Override
     public int hashCode() {
         int result = Objects.hash(tableId, splitId, highWatermark);
-        result = 31 * result + Arrays.hashCode(splitStart);
-        result = 31 * result + Arrays.hashCode(splitEnd);
+        result = 31 * result + Arrays.deepHashCode(splitStart);
+        result = 31 * result + Arrays.deepHashCode(splitEnd);
         return result;
     }
 
@@ -114,9 +114,9 @@ public class FinishedSnapshotSplitInfo implements 
OffsetDeserializerSerializer {
                 + splitId
                 + '\''
                 + ", splitStart="
-                + Arrays.toString(splitStart)
+                + Arrays.deepToString(splitStart)
                 + ", splitEnd="
-                + Arrays.toString(splitEnd)
+                + Arrays.deepToString(splitEnd)
                 + ", highWatermark="
                 + highWatermark
                 + '}';
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfoTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfoTest.java
new file mode 100644
index 000000000..0dbd59cda
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfoTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.meta.split;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.function.Function;
+
+/** Unit tests for {@link FinishedSnapshotSplitInfo}. */
+class FinishedSnapshotSplitInfoTest {
+    @Test
+    void testInfosWithBinaryPrimaryKeyAreEqual() {
+        assertInfosWithBinaryPrimaryKeyAreEqual(Function.identity());
+    }
+
+    @Test
+    void testInfosWithBinaryPrimaryKeyHaveEqualHashCodes() {
+        
assertInfosWithBinaryPrimaryKeyAreEqual(FinishedSnapshotSplitInfo::hashCode);
+    }
+
+    @Test
+    void testInfosWithBinaryPrimaryKeyHaveEqualStringRepresentations() {
+        
assertInfosWithBinaryPrimaryKeyAreEqual(FinishedSnapshotSplitInfo::toString);
+    }
+
+    private <R> void assertInfosWithBinaryPrimaryKeyAreEqual(
+            Function<FinishedSnapshotSplitInfo, R> function) {
+        FinishedSnapshotSplitInfo original =
+                new FinishedSnapshotSplitInfo(
+                        TableId.parse("table"),
+                        "split-1",
+                        new Object[] {new byte[] {0x01, 0x02}},
+                        new Object[] {new byte[] {0x03, 0x04}},
+                        null,
+                        Mockito.mock(OffsetFactory.class));
+
+        FinishedSnapshotSplitInfo copy = 
original.deserialize(original.serialize());
+
+        
Assertions.assertThat(function.apply(copy)).isEqualTo(function.apply(original));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfo.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfo.java
index 1ecb154a8..a3119c2e1 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfo.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfo.java
@@ -89,16 +89,16 @@ public class FinishedSnapshotSplitInfo {
         FinishedSnapshotSplitInfo that = (FinishedSnapshotSplitInfo) o;
         return Objects.equals(tableId, that.tableId)
                 && Objects.equals(splitId, that.splitId)
-                && Arrays.equals(splitStart, that.splitStart)
-                && Arrays.equals(splitEnd, that.splitEnd)
+                && Arrays.deepEquals(splitStart, that.splitStart)
+                && Arrays.deepEquals(splitEnd, that.splitEnd)
                 && Objects.equals(highWatermark, that.highWatermark);
     }
 
     @Override
     public int hashCode() {
         int result = Objects.hash(tableId, splitId, highWatermark);
-        result = 31 * result + Arrays.hashCode(splitStart);
-        result = 31 * result + Arrays.hashCode(splitEnd);
+        result = 31 * result + Arrays.deepHashCode(splitStart);
+        result = 31 * result + Arrays.deepHashCode(splitEnd);
         return result;
     }
 
@@ -111,9 +111,9 @@ public class FinishedSnapshotSplitInfo {
                 + splitId
                 + '\''
                 + ", splitStart="
-                + Arrays.toString(splitStart)
+                + Arrays.deepToString(splitStart)
                 + ", splitEnd="
-                + Arrays.toString(splitEnd)
+                + Arrays.deepToString(splitEnd)
                 + ", highWatermark="
                 + highWatermark
                 + '}';
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfoTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfoTest.java
new file mode 100644
index 000000000..f28221b61
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfoTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.split;
+
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Function;
+
+/** Unit tests for {@link FinishedSnapshotSplitInfo}. */
+class FinishedSnapshotSplitInfoTest {
+    @Test
+    void testInfosWithBinaryPrimaryKeyAreEqual() {
+        assertInfosWithBinaryPrimaryKeyAreEqual(Function.identity());
+    }
+
+    @Test
+    void testInfosWithBinaryPrimaryKeyHaveEqualHashCodes() {
+        
assertInfosWithBinaryPrimaryKeyAreEqual(FinishedSnapshotSplitInfo::hashCode);
+    }
+
+    @Test
+    void testInfosWithBinaryPrimaryKeyHaveEqualStringRepresentations() {
+        
assertInfosWithBinaryPrimaryKeyAreEqual(FinishedSnapshotSplitInfo::toString);
+    }
+
+    private <R> void assertInfosWithBinaryPrimaryKeyAreEqual(
+            Function<FinishedSnapshotSplitInfo, R> function) {
+        FinishedSnapshotSplitInfo original =
+                new FinishedSnapshotSplitInfo(
+                        TableId.parse("table"),
+                        "split-1",
+                        new Object[] {new byte[] {0x01, 0x02}},
+                        new Object[] {new byte[] {0x03, 0x04}},
+                        BinlogOffset.ofBinlogFilePosition("mysql-bin.000001", 
12345L));
+
+        FinishedSnapshotSplitInfo copy =
+                FinishedSnapshotSplitInfo.deserialize(
+                        FinishedSnapshotSplitInfo.serialize(original));
+
+        
Assertions.assertThat(function.apply(copy)).isEqualTo(function.apply(original));
+    }
+}

Reply via email to