zymap commented on issue #9557:
URL: https://github.com/apache/pulsar/issues/9557#issuecomment-789602819


   @Lanayx I am trying to reproduce this issue, I am using the following case 
to test but everything looks good. Could you please take a look at the test 
case?
   
   ```
       @Test
       public void testSeekBatchMessageWithSerializedMessageId() throws 
PulsarClientException, InterruptedException {
           final String topic = "public/default/seek-with-batch-message-" + 
UUID.randomUUID();
           final String messagePrefix = "message";
           @Cleanup
           PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
               .topic(topic)
               .enableBatching(true)
               .batchingMaxMessages(20)
               .create();
           @Cleanup
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
               .topic(topic)
               .subscriptionName("seek-batch")
               .subscribe();
   
           ArrayList<String> messages = new ArrayList<>();
           // send messages and batch the messages
           CountDownLatch latch = new CountDownLatch(10);
           IntStream.range(0, 10)
               .forEach(i -> {
                   String msg = String.format("%s-%d", messagePrefix, i);
                   producer.sendAsync(msg);
                   messages.add(msg);
                   latch.countDown();
               });
           latch.await();
           producer.flush();
   
           Message<String> msg1 = consumer.receive();
           assertEquals(msg1.getValue(), messages.get(0));
           Message<String> msg2 = consumer.receive();
           assertEquals(msg2.getValue(), messages.get(1));
           Message<String> msg3 = consumer.receive();
           assertEquals(msg3.getValue(), messages.get(2));
           Message<String> msg4 = consumer.receive();
           assertEquals(msg4.getValue(), messages.get(3));
   
           consumer.seek(msg3.getMessageId());
           Message<String> seekMsg4 = consumer.receive();
           assertEquals(seekMsg4.getValue(), messages.get(3));
   
           byte[] msg3IdBytes = msg3.getMessageId().toByteArray();
           MessageId msg3Id = null;
           try {
               msg3Id = MessageId.fromByteArray(msg3IdBytes);
           } catch (IOException e) {
               fail("Failed parse message id from byte array");
           }
           consumer.seek(msg3Id);
           seekMsg4 = consumer.receive();
           assertEquals(seekMsg4.getValue(), messages.get(3));
       }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to