liangyepianzhou opened a new pull request, #17836:
URL: https://github.com/apache/pulsar/pull/17836
### Motivation
When the producer sends messages in multiple threads, the message with the
smaller sequence Id can be pushed later than the message with the bigger
sequence Id. Then there will be an exception thrown at
persistentTopic::publishTxnMessage
### Modification
Move getting sequenceId in the sync code.
### reproduce
This test can reproduce this problem, but this is not guaranteed to recur.
>
@Test
public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
String topic = NAMESPACE1 + "/sequenceId";
int totalMessage = 10;
int threadSize = 30;
String topicName = "subscription";
ExecutorService executorService =
Executors.newFixedThreadPool(threadSize);
//build producer/consumer
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer")
.sendTimeout(0, TimeUnit.SECONDS)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(topicName)
.subscribe();
//store the send/ack result futures
ArrayList<CompletableFuture<MessageId>> sendFutures = new
ArrayList<>();
ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
//send and ack messages with transaction
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();
for (int i = 0; i < totalMessage * threadSize; i++) {
producer.newMessage().send();
}
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
new Thread(() -> {
for (int i = 0; i < threadSize; i++) {
executorService.submit(() -> {
try {
for (int j = 0; j < totalMessage; j++) {
CompletableFuture<MessageId> sendFuture =
producer.newMessage(transaction).sendAsync();
sendFutures.add(sendFuture);
Message<byte[]> message = consumer.receive();
CompletableFuture<Void> ackFuture =
consumer.acknowledgeAsync(message.getMessageId(),
transaction);
ackFutures.add(ackFuture);
}
countDownLatch.countDown();
} catch (Exception e) {
log.error("Failed to send/ack messages with
transaction.", e);
}
});
}
}).start();
//wait the all send/ack op is excuted and store its futures in the
arraylist.
countDownLatch.await(10, TimeUnit.SECONDS);
transaction.commit().get();
for (int i = 0; i < threadSize * totalMessage; i++) {
consumer.receive();
}
}
### Documentation
- [x] `doc-not-needed`
(Please explain why)
### Matching PR in forked repository
PR in forked repository: https://github.com/liangyepianzhou/pulsar/pull/6
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]