codelipenghui opened a new issue #13792: URL: https://github.com/apache/pulsar/issues/13792
**Describe the bug** Client-side error log: ``` 22:35:10.765 [pulsar-client-internal-4-1:org.apache.pulsar.client.impl.TransactionMetaStoreHandler@511] ERROR org.apache.pulsar.client.impl.TransactionMetaStoreHandler - Got END_TXN response for request 2036419640951504857 error UnknownError ``` Server side error log: ``` 22:35:07.055 [pulsar-transaction-timer-43-1] ERROR org.apache.pulsar.broker.service.ServerCnx - Send response error for END_TXN request 2036419640951505362. org.apache.pulsar.client.api.transaction.TransactionBufferClientException$RequestTimeoutException: Transaction buffer request timeout. at org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.run(TransactionBufferHandlerImpl.java:240) [io.streamnative-pulsar-broker-2.9.1.2.jar:2.9.1.2] at io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at java.lang.Thread.run(Thread.java:829) [?:?] ``` **To Reproduce** Steps to reproduce the behavior: 1. Run the following test code 2. Unload the topic `public/test/s_topic` during the test (maybe multiple times) 3. See error Test code: ```java package io.streamnative.test; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class TransactionTest { private static final Logger log = LoggerFactory.getLogger(TransactionTest.class); public static void main(String[] args) throws PulsarClientException, InterruptedException { final String sourceTopic = "public/test/s_topic"; final String targetTopic = "public/test/t_topic"; PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .enableTransaction(true) .build(); Producer<String> producer = client.newProducer(Schema.STRING) .topic(targetTopic) .blockIfQueueFull(true) .sendTimeout(0, TimeUnit.SECONDS) .create(); Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(sourceTopic) .subscriptionName("sub") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Shared) .subscribe(); final int totalTransactions = 1000000; CountDownLatch latch = new CountDownLatch(totalTransactions); log.info("Prepare data!"); Producer<String> sourceProducer = client.newProducer(Schema.STRING) .topic(sourceTopic) .sendTimeout(0, TimeUnit.SECONDS) .maxPendingMessages(10) .blockIfQueueFull(true) .create(); new Thread(() -> { for (int i = 0; i < totalTransactions; i++) { sourceProducer.newMessage().value(i + "").sendAsync(); } }).start(); log.info("Test started!"); new Thread(() -> { for (int i = 0; i < totalTransactions; i++) { try { Message<String> msg = consumer.receive(); log.debug("Received message!"); CompletableFuture<Transaction> txn = client.newTransaction().build(); txn.thenComposeAsync(transaction -> { log.debug("Transaction opened!"); return producer.newMessage(transaction).value(msg.getValue()).sendAsync() .thenComposeAsync(messageId -> { return consumer.acknowledgeAsync(msg.getMessageId(), transaction); }).thenAcceptAsync(__ -> { transaction.commit().thenRun(() -> { log.debug("Transaction committed!"); latch.countDown(); }).exceptionally(e -> { latch.countDown(); return null; }); }).exceptionally(e -> { transaction.abort().thenRun(() -> { log.debug("Transaction aborted!"); latch.countDown(); }).exceptionally(ex -> { latch.countDown(); return null; }); return null; }); }).exceptionally(ex -> { log.error("", ex); return null; }); } catch (PulsarClientException e) { e.printStackTrace(); } } }).start(); latch.await(); log.info("Test Done!"); } } ``` **Expected behavior** - The client-side should not print the `UnknownError` **Additional context** broker logs: [pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882610/pulsar-broker-lipenghuideMacBook-Pro-2.local.log) [pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882616/pulsar-broker-lipenghuideMacBook-Pro-2.local.log) [pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882618/pulsar-broker-lipenghuideMacBook-Pro-2.local.log) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org