This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new b0a8c9fe17d [fix][client] Fixes batch_size not checked in 
MessageId#fromByteArrayWithTopic (#18405)
b0a8c9fe17d is described below

commit b0a8c9fe17dfb30587fb330599f80726b2cd8ac2
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 14 17:27:10 2022 +0800

    [fix][client] Fixes batch_size not checked in 
MessageId#fromByteArrayWithTopic (#18405)
    
    Fixes https://github.com/apache/pulsar/issues/18395
    
    ### Motivation
    
    The old version Pulsar clients might not set the `batch_size` field in a
    batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
    which only checks the `batch_index` field, fail with
    IllegalStateException.
    
    ### Modifications
    
    Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
    it doesn't exist, create the `BatchMessageIdImpl` instance with the
    default batch size (0) and the acker (disabled).
    
    Move `MessageIdSerializationTest` to the `pulsar-client` module and add
    the `testBatchSizeNotSet` to verify the change works.
    
    (cherry picked from commit 8246e3bdd2173541b15dc1a26738bf59639949eb)
---
 .../java/org/apache/pulsar/client/impl/MessageIdImpl.java | 10 ++++++++--
 .../pulsar/client/impl}/MessageIdSerializationTest.java   | 15 ++++++++++++---
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 2d571852919..ac37d1d5eb2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -154,8 +154,14 @@ public class MessageIdImpl implements MessageId {
 
         MessageId messageId;
         if (idData.hasBatchIndex()) {
-            messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
-                idData.getBatchIndex(), idData.getBatchSize(), 
BatchMessageAcker.newAcker(idData.getBatchSize()));
+            if (idData.hasBatchSize()) {
+                messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
+                        idData.getBatchIndex(), idData.getBatchSize(),
+                        BatchMessageAcker.newAcker(idData.getBatchSize()));
+            } else {
+                messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
+                        idData.getBatchIndex(), 0, 
BatchMessageAckerDisabled.INSTANCE);
+            }
         } else {
             messageId = new MessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition());
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
similarity index 75%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java
rename to 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
index 295c7803372..b1cfad15128 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
@@ -16,15 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 import java.io.IOException;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.testng.annotations.Test;
 
-@Test(groups = "broker")
 public class MessageIdSerializationTest {
 
     @Test
@@ -32,6 +30,7 @@ public class MessageIdSerializationTest {
         MessageId id = new MessageIdImpl(1, 2, 3);
         byte[] serializedId = id.toByteArray();
         assertEquals(MessageId.fromByteArray(serializedId), id);
+        assertEquals(MessageId.fromByteArrayWithTopic(serializedId, 
"my-topic"), id);
     }
 
     @Test
@@ -39,6 +38,16 @@ public class MessageIdSerializationTest {
         MessageId id = new MessageIdImpl(1, 2, -1);
         byte[] serializedId = id.toByteArray();
         assertEquals(MessageId.fromByteArray(serializedId), id);
+        assertEquals(MessageId.fromByteArrayWithTopic(serializedId, 
"my-topic"), id);
+    }
+
+    @Test
+    public void testBatchSizeNotSet() throws Exception {
+        MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1,
+                BatchMessageAckerDisabled.INSTANCE);
+        byte[] serialized = id.toByteArray();
+        assertEquals(MessageId.fromByteArray(serialized), id);
+        assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"), 
id);
     }
 
     @Test(expectedExceptions = NullPointerException.class)

Reply via email to