codelipenghui opened a new issue #13792:
URL: https://github.com/apache/pulsar/issues/13792


   **Describe the bug**
   
   Client-side error log:
   ```
   22:35:10.765 
[pulsar-client-internal-4-1:org.apache.pulsar.client.impl.TransactionMetaStoreHandler@511]
 ERROR org.apache.pulsar.client.impl.TransactionMetaStoreHandler - Got END_TXN 
response for request 2036419640951504857 error UnknownError
   ```
   
   Server side error log:
   
   ```
   22:35:07.055 [pulsar-transaction-timer-43-1] ERROR 
org.apache.pulsar.broker.service.ServerCnx - Send response error for END_TXN 
request 2036419640951505362.
   
org.apache.pulsar.client.api.transaction.TransactionBufferClientException$RequestTimeoutException:
 Transaction buffer request timeout.
        at 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.run(TransactionBufferHandlerImpl.java:240)
 [io.streamnative-pulsar-broker-2.9.1.2.jar:2.9.1.2]
        at 
io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
 [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at 
io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) 
[io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at 
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
 [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at 
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
 [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) 
[io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at java.lang.Thread.run(Thread.java:829) [?:?]
   ```
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Run the following test code
   2. Unload the topic `public/test/s_topic` during the test (maybe multiple 
times)
   3. See error
   
   Test code:
   ```java
   package io.streamnative.test;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.Schema;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;
   import org.apache.pulsar.client.api.SubscriptionType;
   import org.apache.pulsar.client.api.transaction.Transaction;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.CountDownLatch;
   import java.util.concurrent.TimeUnit;
   
   public class TransactionTest {
       private static final Logger log = 
LoggerFactory.getLogger(TransactionTest.class);
       public static void main(String[] args) throws PulsarClientException, 
InterruptedException {
           final String sourceTopic = "public/test/s_topic";
           final String targetTopic = "public/test/t_topic";
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://127.0.0.1:6650")
                   .enableTransaction(true)
                   .build();
           Producer<String> producer = client.newProducer(Schema.STRING)
                   .topic(targetTopic)
                   .blockIfQueueFull(true)
                   .sendTimeout(0, TimeUnit.SECONDS)
                   .create();
           Consumer<String> consumer = client.newConsumer(Schema.STRING)
                   .topic(sourceTopic)
                   .subscriptionName("sub")
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
           final int totalTransactions = 1000000;
           CountDownLatch latch = new CountDownLatch(totalTransactions);
   
           log.info("Prepare data!");
   
           Producer<String> sourceProducer = client.newProducer(Schema.STRING)
                   .topic(sourceTopic)
                   .sendTimeout(0, TimeUnit.SECONDS)
                   .maxPendingMessages(10)
                   .blockIfQueueFull(true)
                   .create();
   
           new Thread(() -> {
               for (int i = 0; i < totalTransactions; i++) {
                   sourceProducer.newMessage().value(i + "").sendAsync();
               }
           }).start();
           log.info("Test started!");
           new Thread(() -> {
               for (int i = 0; i < totalTransactions; i++) {
                   try {
                       Message<String> msg = consumer.receive();
                       log.debug("Received message!");
                       CompletableFuture<Transaction> txn = 
client.newTransaction().build();
                       txn.thenComposeAsync(transaction -> {
                           log.debug("Transaction opened!");
                               return 
producer.newMessage(transaction).value(msg.getValue()).sendAsync()
                                   .thenComposeAsync(messageId -> {
                                       return 
consumer.acknowledgeAsync(msg.getMessageId(), transaction);
                           }).thenAcceptAsync(__ -> {
                               transaction.commit().thenRun(() -> {
                                   log.debug("Transaction committed!");
                                   latch.countDown();
                               }).exceptionally(e -> {
                                   latch.countDown();
                                   return null;
                               });
                           }).exceptionally(e -> {
                               transaction.abort().thenRun(() -> {
                                   log.debug("Transaction aborted!");
                                   latch.countDown();
                               }).exceptionally(ex -> {
                                   latch.countDown();
                                   return null;
                               });
                               return null;
                           });
                       }).exceptionally(ex -> {
                           log.error("", ex);
                           return null;
                       });
                   } catch (PulsarClientException e) {
                       e.printStackTrace();
                   }
               }
           }).start();
   
           latch.await();
           log.info("Test Done!");
       }
   }
   
   ```
   
   **Expected behavior**
   - The client-side should not print the `UnknownError`
   
   **Additional context**
   broker logs:
   
[pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882610/pulsar-broker-lipenghuideMacBook-Pro-2.local.log)
   
   
[pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882616/pulsar-broker-lipenghuideMacBook-Pro-2.local.log)
   
   
[pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882618/pulsar-broker-lipenghuideMacBook-Pro-2.local.log)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to