This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new ea384c8d7 [FLINK-38310] Fix handling equals and hashcode method of binary boundaries in FinishedSnapshotSplitInfo (#4112) ea384c8d7 is described below commit ea384c8d798f78da676aba37e0913f6ca9d89e39 Author: Sergei Morozov <moro...@tut.by> AuthorDate: Mon Sep 15 23:10:52 2025 -0700 [FLINK-38310] Fix handling equals and hashcode method of binary boundaries in FinishedSnapshotSplitInfo (#4112) --- .../flink-cdc-base/pom.xml | 13 +++++ .../meta/split/FinishedSnapshotSplitInfo.java | 12 ++--- .../meta/split/FinishedSnapshotSplitInfoTest.java | 61 ++++++++++++++++++++++ .../source/split/FinishedSnapshotSplitInfo.java | 12 ++--- .../split/FinishedSnapshotSplitInfoTest.java | 61 ++++++++++++++++++++++ 5 files changed, 147 insertions(+), 12 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml index edb1f1249..fb8ba75bd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml @@ -27,6 +27,10 @@ limitations under the License. <artifactId>flink-cdc-base</artifactId> + <properties> + <mockito.version>3.4.6</mockito.version> + </properties> + <dependencies> <dependency> <groupId>org.apache.flink</groupId> @@ -145,5 +149,14 @@ limitations under the License. <version>${testcontainers.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <type>jar</type> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfo.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfo.java index 7c3b7ed1d..1f0b37978 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfo.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfo.java @@ -92,16 +92,16 @@ public class FinishedSnapshotSplitInfo implements OffsetDeserializerSerializer { FinishedSnapshotSplitInfo that = (FinishedSnapshotSplitInfo) o; return Objects.equals(tableId, that.tableId) && Objects.equals(splitId, that.splitId) - && Arrays.equals(splitStart, that.splitStart) - && Arrays.equals(splitEnd, that.splitEnd) + && Arrays.deepEquals(splitStart, that.splitStart) + && Arrays.deepEquals(splitEnd, that.splitEnd) && Objects.equals(highWatermark, that.highWatermark); } @Override public int hashCode() { int result = Objects.hash(tableId, splitId, highWatermark); - result = 31 * result + Arrays.hashCode(splitStart); - result = 31 * result + Arrays.hashCode(splitEnd); + result = 31 * result + Arrays.deepHashCode(splitStart); + result = 31 * result + Arrays.deepHashCode(splitEnd); return result; } @@ -114,9 +114,9 @@ public class FinishedSnapshotSplitInfo implements OffsetDeserializerSerializer { + splitId + '\'' + ", splitStart=" - + Arrays.toString(splitStart) + + Arrays.deepToString(splitStart) + ", splitEnd=" - + Arrays.toString(splitEnd) + + Arrays.deepToString(splitEnd) + ", highWatermark=" + highWatermark + '}'; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfoTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfoTest.java new file mode 100644 index 000000000..0dbd59cda --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/FinishedSnapshotSplitInfoTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.meta.split; + +import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; + +import io.debezium.relational.TableId; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.function.Function; + +/** Unit tests for {@link FinishedSnapshotSplitInfo}. */ +class FinishedSnapshotSplitInfoTest { + @Test + void testInfosWithBinaryPrimaryKeyAreEqual() { + assertInfosWithBinaryPrimaryKeyAreEqual(Function.identity()); + } + + @Test + void testInfosWithBinaryPrimaryKeyHaveEqualHashCodes() { + assertInfosWithBinaryPrimaryKeyAreEqual(FinishedSnapshotSplitInfo::hashCode); + } + + @Test + void testInfosWithBinaryPrimaryKeyHaveEqualStringRepresentations() { + assertInfosWithBinaryPrimaryKeyAreEqual(FinishedSnapshotSplitInfo::toString); + } + + private <R> void assertInfosWithBinaryPrimaryKeyAreEqual( + Function<FinishedSnapshotSplitInfo, R> function) { + FinishedSnapshotSplitInfo original = + new FinishedSnapshotSplitInfo( + TableId.parse("table"), + "split-1", + new Object[] {new byte[] {0x01, 0x02}}, + new Object[] {new byte[] {0x03, 0x04}}, + null, + Mockito.mock(OffsetFactory.class)); + + FinishedSnapshotSplitInfo copy = original.deserialize(original.serialize()); + + Assertions.assertThat(function.apply(copy)).isEqualTo(function.apply(original)); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfo.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfo.java index 1ecb154a8..a3119c2e1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfo.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfo.java @@ -89,16 +89,16 @@ public class FinishedSnapshotSplitInfo { FinishedSnapshotSplitInfo that = (FinishedSnapshotSplitInfo) o; return Objects.equals(tableId, that.tableId) && Objects.equals(splitId, that.splitId) - && Arrays.equals(splitStart, that.splitStart) - && Arrays.equals(splitEnd, that.splitEnd) + && Arrays.deepEquals(splitStart, that.splitStart) + && Arrays.deepEquals(splitEnd, that.splitEnd) && Objects.equals(highWatermark, that.highWatermark); } @Override public int hashCode() { int result = Objects.hash(tableId, splitId, highWatermark); - result = 31 * result + Arrays.hashCode(splitStart); - result = 31 * result + Arrays.hashCode(splitEnd); + result = 31 * result + Arrays.deepHashCode(splitStart); + result = 31 * result + Arrays.deepHashCode(splitEnd); return result; } @@ -111,9 +111,9 @@ public class FinishedSnapshotSplitInfo { + splitId + '\'' + ", splitStart=" - + Arrays.toString(splitStart) + + Arrays.deepToString(splitStart) + ", splitEnd=" - + Arrays.toString(splitEnd) + + Arrays.deepToString(splitEnd) + ", highWatermark=" + highWatermark + '}'; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfoTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfoTest.java new file mode 100644 index 000000000..f28221b61 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/FinishedSnapshotSplitInfoTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.split; + +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; + +import io.debezium.relational.TableId; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.function.Function; + +/** Unit tests for {@link FinishedSnapshotSplitInfo}. */ +class FinishedSnapshotSplitInfoTest { + @Test + void testInfosWithBinaryPrimaryKeyAreEqual() { + assertInfosWithBinaryPrimaryKeyAreEqual(Function.identity()); + } + + @Test + void testInfosWithBinaryPrimaryKeyHaveEqualHashCodes() { + assertInfosWithBinaryPrimaryKeyAreEqual(FinishedSnapshotSplitInfo::hashCode); + } + + @Test + void testInfosWithBinaryPrimaryKeyHaveEqualStringRepresentations() { + assertInfosWithBinaryPrimaryKeyAreEqual(FinishedSnapshotSplitInfo::toString); + } + + private <R> void assertInfosWithBinaryPrimaryKeyAreEqual( + Function<FinishedSnapshotSplitInfo, R> function) { + FinishedSnapshotSplitInfo original = + new FinishedSnapshotSplitInfo( + TableId.parse("table"), + "split-1", + new Object[] {new byte[] {0x01, 0x02}}, + new Object[] {new byte[] {0x03, 0x04}}, + BinlogOffset.ofBinlogFilePosition("mysql-bin.000001", 12345L)); + + FinishedSnapshotSplitInfo copy = + FinishedSnapshotSplitInfo.deserialize( + FinishedSnapshotSplitInfo.serialize(original)); + + Assertions.assertThat(function.apply(copy)).isEqualTo(function.apply(original)); + } +}