liangyepianzhou opened a new pull request, #17836:
URL: https://github.com/apache/pulsar/pull/17836

   ### Motivation
   When the producer sends messages in multiple threads, the message with the 
smaller sequence Id can be pushed later than the message with the bigger 
sequence Id. Then there will be an exception thrown at 
persistentTopic::publishTxnMessage 
   ### Modification
   Move getting sequenceId in the sync code.
   
   ### reproduce
   This test can reproduce this problem, but this is not guaranteed to recur.
   
   > 
       @Test
       public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
           String topic = NAMESPACE1 + "/sequenceId";
           int totalMessage = 10;
           int threadSize = 30;
           String topicName = "subscription";
           ExecutorService executorService = 
Executors.newFixedThreadPool(threadSize);
   
           //build producer/consumer
           Producer<byte[]> producer = pulsarClient.newProducer()
                   .topic(topic)
                   .producerName("producer")
                   .sendTimeout(0, TimeUnit.SECONDS)
                   .create();
   
           Consumer<byte[]> consumer = pulsarClient.newConsumer()
                   .topic(topic)
                   .subscriptionType(SubscriptionType.Exclusive)
                   .subscriptionName(topicName)
                   .subscribe();
           //store the send/ack result  futures
           ArrayList<CompletableFuture<MessageId>> sendFutures = new 
ArrayList<>();
           ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
   
           //send and ack messages with transaction
           Transaction transaction = pulsarClient.newTransaction()
                   .withTransactionTimeout(10, TimeUnit.SECONDS)
                   .build()
                   .get();
   
           for (int i = 0; i < totalMessage * threadSize; i++) {
               producer.newMessage().send();
           }
   
           CountDownLatch countDownLatch = new CountDownLatch(threadSize);
           new Thread(() -> {
               for (int i = 0; i < threadSize; i++) {
                   executorService.submit(() -> {
                       try {
                           for (int j = 0; j < totalMessage; j++) {
                               CompletableFuture<MessageId> sendFuture = 
producer.newMessage(transaction).sendAsync();
                              sendFutures.add(sendFuture);
                               Message<byte[]> message = consumer.receive();
                               CompletableFuture<Void> ackFuture = 
consumer.acknowledgeAsync(message.getMessageId(),
                                       transaction);
                             ackFutures.add(ackFuture);
                           }
                           countDownLatch.countDown();
                       } catch (Exception e) {
                           log.error("Failed to send/ack messages with 
transaction.", e);
                       }
                   });
               }
           }).start();
           //wait the all send/ack op is excuted and store its futures in the 
arraylist.
           countDownLatch.await(10, TimeUnit.SECONDS);
           transaction.commit().get();
   
           for (int i = 0; i < threadSize * totalMessage; i++) {
               consumer.receive();
           }
   
       }
   
   ### Documentation
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/liangyepianzhou/pulsar/pull/6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to