This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f56ae72 Fix inconsistent equals and hashCode for MessageIds (#9440)
f56ae72 is described below
commit f56ae72565b20b5995501842e79dcc3e1b814a12
Author: Greg Methvin <[email protected]>
AuthorDate: Wed Apr 21 15:57:02 2021 -0700
Fix inconsistent equals and hashCode for MessageIds (#9440)
---
.../pulsar/client/impl/BatchMessageIdImpl.java | 48 ++++++--------
.../apache/pulsar/client/impl/MessageIdImpl.java | 76 +++++++++++++---------
.../pulsar/client/impl/TopicMessageIdImpl.java | 9 +--
.../src/main/resources/findbugsExclude.xml | 5 ++
.../pulsar/client/impl/BatchMessageIdImplTest.java | 30 ++++++++-
.../pulsar/client/impl/MessageIdCompareToTest.java | 4 +-
.../pulsar/client/impl/TopicMessageIdImplTest.java | 57 ++++++++++++++++
7 files changed, 160 insertions(+), 69 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index 7c7672c..fd8ea72 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -26,7 +26,7 @@ import org.apache.pulsar.client.api.MessageId;
public class BatchMessageIdImpl extends MessageIdImpl {
private static final long serialVersionUID = 1L;
- private final static int NO_BATCH = -1;
+ static final int NO_BATCH = -1;
private final int batchIndex;
private final int batchSize;
@@ -69,44 +69,36 @@ public class BatchMessageIdImpl extends MessageIdImpl {
@Override
public int compareTo(MessageId o) {
- if (o instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl other = (BatchMessageIdImpl) o;
- return ComparisonChain.start()
- .compare(this.ledgerId, other.ledgerId)
- .compare(this.entryId, other.entryId)
- .compare(this.batchIndex, other.batchIndex)
- .compare(this.getPartitionIndex(), other.getPartitionIndex())
- .result();
- } else if (o instanceof MessageIdImpl) {
- int res = super.compareTo(o);
- if (res == 0 && batchIndex > NO_BATCH) {
- return 1;
- } else {
- return res;
- }
+ if (o instanceof MessageIdImpl) {
+ MessageIdImpl other = (MessageIdImpl) o;
+ int batchIndex = (o instanceof BatchMessageIdImpl) ?
((BatchMessageIdImpl) o).batchIndex : NO_BATCH;
+ return messageIdCompare(
+ this.ledgerId, this.entryId, this.partitionIndex,
this.batchIndex,
+ other.ledgerId, other.entryId, other.partitionIndex, batchIndex
+ );
} else if (o instanceof TopicMessageIdImpl) {
return compareTo(((TopicMessageIdImpl) o).getInnerMessageId());
} else {
- throw new IllegalArgumentException(
- "expected BatchMessageIdImpl object. Got instance of " +
o.getClass().getName());
+ throw new UnsupportedOperationException("Unknown MessageId type: "
+ o.getClass().getName());
}
}
@Override
public int hashCode() {
- return (int) (31 * (ledgerId + 31 * entryId) + (31 * (long)
partitionIndex) + batchIndex);
+ return messageIdHashCode(ledgerId, entryId, partitionIndex,
batchIndex);
}
@Override
- public boolean equals(Object obj) {
- if (obj instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl other = (BatchMessageIdImpl) obj;
- return ledgerId == other.ledgerId && entryId == other.entryId &&
partitionIndex == other.partitionIndex
- && batchIndex == other.batchIndex;
- } else if (obj instanceof MessageIdImpl) {
- MessageIdImpl other = (MessageIdImpl) obj;
- return ledgerId == other.ledgerId && entryId == other.entryId &&
partitionIndex == other.partitionIndex
- && batchIndex == NO_BATCH;
+ public boolean equals(Object o) {
+ if (o instanceof MessageIdImpl) {
+ MessageIdImpl other = (MessageIdImpl) o;
+ int batchIndex = (o instanceof BatchMessageIdImpl) ?
((BatchMessageIdImpl) o).batchIndex : NO_BATCH;
+ return messageIdEquals(
+ this.ledgerId, this.entryId, this.partitionIndex,
this.batchIndex,
+ other.ledgerId, other.entryId, other.partitionIndex, batchIndex
+ );
+ } else if (o instanceof TopicMessageIdImpl) {
+ return equals(((TopicMessageIdImpl) o).getInnerMessageId());
}
return false;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 210230b..855ee64 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -32,6 +32,8 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
+import static org.apache.pulsar.client.impl.BatchMessageIdImpl.NO_BATCH;
+
public class MessageIdImpl implements MessageId {
protected final long ledgerId;
protected final long entryId;
@@ -63,19 +65,20 @@ public class MessageIdImpl implements MessageId {
@Override
public int hashCode() {
- return (int) (31 * (ledgerId + 31 * entryId) + partitionIndex);
+ return messageIdHashCode(ledgerId, entryId, partitionIndex, NO_BATCH);
}
@Override
- public boolean equals(Object obj) {
- if (obj instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl other = (BatchMessageIdImpl) obj;
- return other.equals(this);
- } else if (obj instanceof MessageIdImpl) {
- MessageIdImpl other = (MessageIdImpl) obj;
- return ledgerId == other.ledgerId && entryId == other.entryId &&
partitionIndex == other.partitionIndex;
- } else if (obj instanceof TopicMessageIdImpl) {
- return equals(((TopicMessageIdImpl) obj).getInnerMessageId());
+ public boolean equals(Object o) {
+ if (o instanceof MessageIdImpl) {
+ MessageIdImpl other = (MessageIdImpl) o;
+ int batchIndex = (o instanceof BatchMessageIdImpl) ?
((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH;
+ return messageIdEquals(
+ this.ledgerId, this.entryId, this.partitionIndex, NO_BATCH,
+ other.ledgerId, other.entryId, other.partitionIndex, batchIndex
+ );
+ } else if (o instanceof TopicMessageIdImpl) {
+ return equals(((TopicMessageIdImpl) o).getInnerMessageId());
}
return false;
}
@@ -197,30 +200,43 @@ public class MessageIdImpl implements MessageId {
@Override
public int compareTo(MessageId o) {
- if (o instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl other = (BatchMessageIdImpl) o;
- int res = ComparisonChain.start()
- .compare(this.ledgerId, other.ledgerId)
- .compare(this.entryId, other.entryId)
- .compare(this.getPartitionIndex(),
other.getPartitionIndex())
- .result();
- if (res == 0 && other.getBatchIndex() > -1) {
- return -1;
- } else {
- return res;
- }
- } else if (o instanceof MessageIdImpl) {
+ if (o instanceof MessageIdImpl) {
MessageIdImpl other = (MessageIdImpl) o;
- return ComparisonChain.start()
- .compare(this.ledgerId, other.ledgerId)
- .compare(this.entryId, other.entryId)
- .compare(this.getPartitionIndex(), other.getPartitionIndex())
- .result();
+ int batchIndex = (o instanceof BatchMessageIdImpl) ?
((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH;
+ return messageIdCompare(
+ this.ledgerId, this.entryId, this.partitionIndex, NO_BATCH,
+ other.ledgerId, other.entryId, other.partitionIndex, batchIndex
+ );
} else if (o instanceof TopicMessageIdImpl) {
return compareTo(((TopicMessageIdImpl) o).getInnerMessageId());
} else {
- throw new IllegalArgumentException(
- "expected MessageIdImpl object. Got instance of " +
o.getClass().getName());
+ throw new UnsupportedOperationException("Unknown MessageId type: "
+ o.getClass().getName());
}
}
+
+ static int messageIdHashCode(long ledgerId, long entryId, int
partitionIndex, int batchIndex) {
+ return (int) (31 * (ledgerId + 31 * entryId) + (31 * (long)
partitionIndex) + batchIndex);
+ }
+
+ static boolean messageIdEquals(
+ long ledgerId1, long entryId1, int partitionIndex1, int batchIndex1,
+ long ledgerId2, long entryId2, int partitionIndex2, int batchIndex2
+ ) {
+ return ledgerId1 == ledgerId2
+ && entryId1 == entryId2
+ && partitionIndex1 == partitionIndex2
+ && batchIndex1 == batchIndex2;
+ }
+
+ static int messageIdCompare(
+ long ledgerId1, long entryId1, int partitionIndex1, int batchIndex1,
+ long ledgerId2, long entryId2, int partitionIndex2, int batchIndex2
+ ) {
+ return ComparisonChain.start()
+ .compare(ledgerId1, ledgerId2)
+ .compare(entryId1, entryId2)
+ .compare(partitionIndex1, partitionIndex2)
+ .compare(batchIndex1, batchIndex2)
+ .result();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 0c75aff..da3686d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -66,17 +66,12 @@ public class TopicMessageIdImpl implements MessageId {
@Override
public int hashCode() {
- return Objects.hash(topicPartitionName, messageId);
+ return messageId.hashCode();
}
@Override
public boolean equals(Object obj) {
- if (!(obj instanceof TopicMessageIdImpl)) {
- return false;
- }
- TopicMessageIdImpl other = (TopicMessageIdImpl) obj;
- return Objects.equals(topicPartitionName, other.topicPartitionName)
- && Objects.equals(messageId, other.messageId);
+ return messageId.equals(obj);
}
@Override
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml
b/pulsar-client/src/main/resources/findbugsExclude.xml
index f887c99..bf01926 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -85,10 +85,15 @@
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
+ <!-- Needed for compatibility with TopicMessageIdImpl -->
<Match>
<Class name="org.apache.pulsar.client.impl.MessageIdImpl"/>
<Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.BatchMessageIdImpl"/>
+ <Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
+ </Match>
<Match>
<Class name="org.apache.pulsar.client.impl.Hash"/>
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
index 5d56f594..f8545ea 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
@@ -48,7 +48,7 @@ public class BatchMessageIdImplTest {
BatchMessageIdImpl batchMsgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
assertEquals(batchMsgId1.hashCode(), batchMsgId1.hashCode());
- assertTrue(batchMsgId1.hashCode() != batchMsgId2.hashCode());
+ assertNotEquals(batchMsgId1.hashCode(), batchMsgId2.hashCode());
}
@Test
@@ -75,6 +75,32 @@ public class BatchMessageIdImplTest {
}
@Test
+ public void equalsUnbatchedTest() {
+ BatchMessageIdImpl batchMsgId1 = new BatchMessageIdImpl(0, 0, 0, -1);
+ BatchMessageIdImpl batchMsgId2 = new BatchMessageIdImpl(1, 1, 1, -1);
+
+ MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
+ MessageIdImpl msgId2 = new MessageIdImpl(1, 1, 1);
+
+ assertEquals(batchMsgId1, msgId1);
+ assertEquals(batchMsgId2, msgId2);
+ assertNotEquals(batchMsgId1, msgId2);
+ assertNotEquals(batchMsgId2, msgId1);
+ }
+
+ @Test
+ public void hashCodeUnbatchedTest() {
+ BatchMessageIdImpl batchMsgId1 = new BatchMessageIdImpl(0, 0, 0, -1);
+ BatchMessageIdImpl batchMsgId2 = new BatchMessageIdImpl(1, 1, 1, -1);
+
+ MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
+ MessageIdImpl msgId2 = new MessageIdImpl(1, 1, 1);
+
+ assertEquals(batchMsgId1.hashCode(), msgId1.hashCode());
+ assertEquals(batchMsgId2.hashCode(), msgId2.hashCode());
+ }
+
+ @Test
public void deserializationTest() {
// initialize BitSet with null
BatchMessageAcker ackerDisabled = new BatchMessageAcker(null, 0);
@@ -101,7 +127,7 @@ public class BatchMessageIdImplTest {
}
@Test
- public void SerializeAdnDeserializeTest() throws IOException {
+ public void serializeAndDeserializeTest() throws IOException {
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(1, 1, 0,
1, 10, BatchMessageAcker.newAcker(10));
byte[] serialized = batchMessageId.toByteArray();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
index 515d353..9db6b37 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
@@ -73,7 +73,7 @@ public class MessageIdCompareToTest {
assertTrue(batchMessageId2.compareTo(batchMessageId3) > 0, "Expected
to be greater than");
assertTrue(batchMessageId2.compareTo(batchMessageId4) > 0, "Expected
to be greater than");
assertTrue(batchMessageId2.compareTo(batchMessageId5) > 0, "Expected
to be greater than");
- assertTrue(batchMessageId3.compareTo(batchMessageId4) > 0, "Expected
to be greater than");
+ assertTrue(batchMessageId4.compareTo(batchMessageId3) > 0, "Expected
to be greater than");
assertTrue(batchMessageId3.compareTo(batchMessageId5) > 0, "Expected
to be greater than");
assertTrue(batchMessageId4.compareTo(batchMessageId5) > 0, "Expected
to be greater than");
}
@@ -105,7 +105,7 @@ public class MessageIdCompareToTest {
assertTrue(batchMessageId3.compareTo(batchMessageId2) < 0, "Expected
to be less than");
assertTrue(batchMessageId4.compareTo(batchMessageId2) < 0, "Expected
to be less than");
assertTrue(batchMessageId5.compareTo(batchMessageId2) < 0, "Expected
to be less than");
- assertTrue(batchMessageId4.compareTo(batchMessageId3) < 0, "Expected
to be less than");
+ assertTrue(batchMessageId3.compareTo(batchMessageId4) < 0, "Expected
to be less than");
assertTrue(batchMessageId5.compareTo(batchMessageId3) < 0, "Expected
to be less than");
assertTrue(batchMessageId5.compareTo(batchMessageId4) < 0, "Expected
to be less than");
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
new file mode 100644
index 0000000..598fe21
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+import org.testng.annotations.Test;
+
+public class TopicMessageIdImplTest {
+ @Test
+ public void hashCodeTest() {
+ MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
+ MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
+ TopicMessageIdImpl topicMsgId1 = new
TopicMessageIdImpl("topic-partition-1", "topic", msgId1);
+ TopicMessageIdImpl topic2MsgId1 = new
TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1);
+ TopicMessageIdImpl topicMsgId2 = new
TopicMessageIdImpl("topic-partition-2", "topic", msgId2);
+
+ assertEquals(topicMsgId1.hashCode(), topicMsgId1.hashCode());
+ assertEquals(topic2MsgId1.hashCode(), topic2MsgId1.hashCode());
+ assertEquals(topicMsgId1.hashCode(), msgId1.hashCode());
+ assertNotEquals(topicMsgId1.hashCode(), topicMsgId2.hashCode());
+ assertEquals(topicMsgId2.hashCode(), msgId2.hashCode());
+ }
+
+ @Test
+ public void equalsTest() {
+ MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
+ MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
+ TopicMessageIdImpl topicMsgId1 = new
TopicMessageIdImpl("topic-partition-1", "topic", msgId1);
+ TopicMessageIdImpl topic2MsgId1 = new
TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1);
+ TopicMessageIdImpl topicMsgId2 = new
TopicMessageIdImpl("topic-partition-2", "topic", msgId2);
+
+ assertEquals(topicMsgId1, topicMsgId1);
+ assertEquals(topicMsgId1, topic2MsgId1);
+ assertEquals(topicMsgId1, msgId1);
+ assertEquals(msgId1, topicMsgId1);
+ assertNotEquals(topicMsgId1, topicMsgId2);
+ }
+
+}