Shoothzj opened a new issue #9613:
URL: https://github.com/apache/pulsar/issues/9613


   **Describe the bug**
   When you create a consumer, cost time more than you config, It may be create 
success in server side, but client side not works.
   
   Server side can see the consumer
   ```
   [root@2636a6ef967c bin]# ./pulsar-admin topics stats 
testf2cf9ffe-2991-4c4a-94ce-d954cc1542eb
   Warning: Nashorn engine is planned to be removed from a future JDK release
   {
     "msgRateIn" : 0.0,
     "msgThroughputIn" : 0.0,
     "msgRateOut" : 0.0,
     "msgThroughputOut" : 0.0,
     "bytesInCounter" : 57,
     "msgInCounter" : 1,
     "bytesOutCounter" : 0,
     "msgOutCounter" : 0,
     "averageMsgSize" : 0.0,
     "msgChunkPublished" : false,
     "storageSize" : 57,
     "backlogSize" : 57,
     "publishers" : [ ],
     "subscriptions" : {
       "74d86b5f-cca2-4ca6-adec-e69702a27eb9" : {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "bytesOutCounter" : 0,
         "msgOutCounter" : 0,
         "msgRateRedeliver" : 0.0,
         "chuckedMessageRate" : 0,
         "msgBacklog" : 1,
         "msgBacklogNoDelayed" : 1,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 0,
         "unackedMessages" : 0,
         "type" : "Exclusive",
         "activeConsumerName" : "2d166",
         "msgRateExpired" : 0.0,
         "lastExpireTimestamp" : 0,
         "lastConsumedFlowTimestamp" : 0,
         "lastConsumedTimestamp" : 0,
         "lastAckedTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 0,
           "msgOutCounter" : 0,
           "msgRateRedeliver" : 0.0,
           "chuckedMessageRate" : 0.0,
           "consumerName" : "2d166",
           "availablePermits" : 0,
           "unackedMessages" : 0,
           "avgMessagesPerEntry" : 1000,
           "blockedConsumerOnUnackedMsgs" : false,
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 0,
           "metadata" : { },
           "address" : "/172.17.0.1:60256",
           "connectedSince" : "2021-02-18T09:08:44.609629Z",
           "clientVersion" : "2.7.0"
         } ],
         "isDurable" : true,
         "isReplicated" : false,
         "consumersAfterMarkDeletePosition" : { }
       }
     },
     "replication" : { },
     "deduplicationStatus" : "Disabled"
   }
   ```
   But the client can not receive any message.
   
   **To Reproduce**
   create consumer in 1 seconds delay
   ```java
    PulsarClient pulsarClient = PulsarClient.builder()
                   .operationTimeout(1, TimeUnit.SECONDS)
                   .serviceUrl("http://127.0.0.1:8080";)
                   .build();
           final ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer();
           Consumer<byte[]> consumer = null;
           try {
               consumer = 
consumerBuilder.topic(topic).subscriptionName(UUID.randomUUID().toString()).messageListener(new
 MessageListener<byte[]>() {
                   @Override
                   public void received(Consumer<byte[]> consumer, 
Message<byte[]> message) {
                       log.info("message is [{}]", message);
                   }
               }).subscribe();
           } catch (Throwable e) {
               log.error("exception ", e);
           }
   ```
   Inject three seconds delay in pulsar server with java-agent
   ```java
   package com.github.shoothzj.demo.agent.interceptor;
   
   import com.github.shoothzj.demo.agent.util.AgentUtil;
   import net.bytebuddy.asm.Advice;
   
   import java.util.concurrent.TimeUnit;
   
   /**
    * @author hezhangjian
    */
   public class PulsarServerCnxInterceptor {
   
       /**
        * #t Class名 ex: org.apache.pulsar.broker.service.ServerCnx
        * #m Method名 ex: handleSubscribe
        *
        * @param signature
        */
       @Advice.OnMethodEnter
       public static void enter(@Advice.Origin("#t #m") String signature) {
           try {
               TimeUnit.SECONDS.sleep(3);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           AgentUtil.info("[{}]", signature);
       }
   
   }
   
   ```
   start pulsar with agent
   ```bash
   java javaagent:demo-agent-0.0.1.SNAPSHOT.jar
   ```
   **Expected behavior**
   Consumer created both failed, or consuemr created both success.
   
   **Desktop (please complete the following information):**
    Linux, Apache Pulsar version 2.7.0 
   client version 2.7.0
   
   **Additional context**
   I put the reproducer agent in additional context
   
[demo-agent-0.0.1.SNAPSHOT.jar.zip](https://github.com/apache/pulsar/files/6001610/demo-agent-0.0.1.SNAPSHOT.jar.zip)
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to