This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new b0a8c9fe17d [fix][client] Fixes batch_size not checked in
MessageId#fromByteArrayWithTopic (#18405)
b0a8c9fe17d is described below
commit b0a8c9fe17dfb30587fb330599f80726b2cd8ac2
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 14 17:27:10 2022 +0800
[fix][client] Fixes batch_size not checked in
MessageId#fromByteArrayWithTopic (#18405)
Fixes https://github.com/apache/pulsar/issues/18395
### Motivation
The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.
### Modifications
Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).
Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.
(cherry picked from commit 8246e3bdd2173541b15dc1a26738bf59639949eb)
---
.../java/org/apache/pulsar/client/impl/MessageIdImpl.java | 10 ++++++++--
.../pulsar/client/impl}/MessageIdSerializationTest.java | 15 ++++++++++++---
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 2d571852919..ac37d1d5eb2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -154,8 +154,14 @@ public class MessageIdImpl implements MessageId {
MessageId messageId;
if (idData.hasBatchIndex()) {
- messageId = new BatchMessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition(),
- idData.getBatchIndex(), idData.getBatchSize(),
BatchMessageAcker.newAcker(idData.getBatchSize()));
+ if (idData.hasBatchSize()) {
+ messageId = new BatchMessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition(),
+ idData.getBatchIndex(), idData.getBatchSize(),
+ BatchMessageAcker.newAcker(idData.getBatchSize()));
+ } else {
+ messageId = new BatchMessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition(),
+ idData.getBatchIndex(), 0,
BatchMessageAckerDisabled.INSTANCE);
+ }
} else {
messageId = new MessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
similarity index 75%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java
rename to
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
index 295c7803372..b1cfad15128 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
@@ -16,15 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import java.io.IOException;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.testng.annotations.Test;
-@Test(groups = "broker")
public class MessageIdSerializationTest {
@Test
@@ -32,6 +30,7 @@ public class MessageIdSerializationTest {
MessageId id = new MessageIdImpl(1, 2, 3);
byte[] serializedId = id.toByteArray();
assertEquals(MessageId.fromByteArray(serializedId), id);
+ assertEquals(MessageId.fromByteArrayWithTopic(serializedId,
"my-topic"), id);
}
@Test
@@ -39,6 +38,16 @@ public class MessageIdSerializationTest {
MessageId id = new MessageIdImpl(1, 2, -1);
byte[] serializedId = id.toByteArray();
assertEquals(MessageId.fromByteArray(serializedId), id);
+ assertEquals(MessageId.fromByteArrayWithTopic(serializedId,
"my-topic"), id);
+ }
+
+ @Test
+ public void testBatchSizeNotSet() throws Exception {
+ MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1,
+ BatchMessageAckerDisabled.INSTANCE);
+ byte[] serialized = id.toByteArray();
+ assertEquals(MessageId.fromByteArray(serialized), id);
+ assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"),
id);
}
@Test(expectedExceptions = NullPointerException.class)