This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d5255e21ced0828a84d40f1f96a52a0883e80274 Author: Yunze Xu <[email protected]> AuthorDate: Mon Nov 14 17:27:10 2022 +0800 [fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic (#18405) Fixes https://github.com/apache/pulsar/issues/18395 ### Motivation The old version Pulsar clients might not set the `batch_size` field in a batched message id, it will cause `MessageId#fromByteArrayWithTopic`, which only checks the `batch_index` field, fail with IllegalStateException. ### Modifications Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If it doesn't exist, create the `BatchMessageIdImpl` instance with the default batch size (0) and the acker (disabled). Move `MessageIdSerializationTest` to the `pulsar-client` module and add the `testBatchSizeNotSet` to verify the change works. (cherry picked from commit 8246e3bdd2173541b15dc1a26738bf59639949eb) --- .../java/org/apache/pulsar/client/impl/MessageIdImpl.java | 10 ++++++++-- .../pulsar/client/impl}/MessageIdSerializationTest.java | 15 ++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 2d571852919..ac37d1d5eb2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -154,8 +154,14 @@ public class MessageIdImpl implements MessageId { MessageId messageId; if (idData.hasBatchIndex()) { - messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), - idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize())); + if (idData.hasBatchSize()) { + messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), + idData.getBatchIndex(), idData.getBatchSize(), + BatchMessageAcker.newAcker(idData.getBatchSize())); + } else { + messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), + idData.getBatchIndex(), 0, BatchMessageAckerDisabled.INSTANCE); + } } else { messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java similarity index 75% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java rename to pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java index 295c7803372..b1cfad15128 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java @@ -16,15 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.service; +package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; import java.io.IOException; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.testng.annotations.Test; -@Test(groups = "broker") public class MessageIdSerializationTest { @Test @@ -32,6 +30,7 @@ public class MessageIdSerializationTest { MessageId id = new MessageIdImpl(1, 2, 3); byte[] serializedId = id.toByteArray(); assertEquals(MessageId.fromByteArray(serializedId), id); + assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id); } @Test @@ -39,6 +38,16 @@ public class MessageIdSerializationTest { MessageId id = new MessageIdImpl(1, 2, -1); byte[] serializedId = id.toByteArray(); assertEquals(MessageId.fromByteArray(serializedId), id); + assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id); + } + + @Test + public void testBatchSizeNotSet() throws Exception { + MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1, + BatchMessageAckerDisabled.INSTANCE); + byte[] serialized = id.toByteArray(); + assertEquals(MessageId.fromByteArray(serialized), id); + assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"), id); } @Test(expectedExceptions = NullPointerException.class)
