geniusjoe opened a new issue, #1236:
URL: https://github.com/apache/pulsar-client-go/issues/1236

   #### Expected behavior
   The `failTimeoutMessages()` timer in `pulsar/producer_partition.go` should 
periodically delete outdated messages in `p.pendingQueue` 
    which are older than `p.options.SendTimeout`.
   
   #### Actual behavior
   Messages in `p.pendingQueue` which are older than `p.options.SendTimeout` 
cannot be deleted. If one message cannot send successfully all the time, it 
will remain in the `pendingQueue` forever and lead to reconnecting fail 
infinitely.
   
   #### Helpful information
   I think this bug may be related to bugfix #551. 
   Bugfix #551 is aimed to solve a race condition between `grabCnx()` and 
`failTimeoutMessages()` functions. Producer may encounter this race condition 
when `failTimeoutMessages()` first delete outdated messages, then `grabCnx()` 
reconnect success and resend these pending deleted messages. 
   #551 solution is to refresh all `p.pendingQueue` messages `sendAt` field 
when  `grabCnx()` reconnect success, so that `failTimeoutMessages()` will not 
take effect to these messages. [Code 
reference](https://github.com/apache/pulsar-client-go/blob/a029f2d7e392fa37511ac44c3276832705ded08b/pulsar/producer_partition.go#L353)
 below:
   ```
   for i := 0; i < viewSize; i++ {
        item := p.pendingQueue.Poll()
        if item == nil {
                continue
        }
        pi := item.(*pendingItem)
        // when resending pending batches, we update the sendAt timestamp and 
put to the back of queue
        // to avoid pending item been removed by failTimeoutMessages and cause 
race condition
        pi.Lock()
        pi.sentAt = time.Now()
        pi.Unlock()
        p.pendingQueue.Put(pi)
        p._getConn().WriteData(pi.buffer)
        if pi == lastViewItem {
                break
        }
   }    
   ```
   When compared with Java client, Java code may not encounter this race 
condition. Java `failPendingMessages()` will first check if current channel 
connection is close, if current connection is not close, then it will trigger 
`cnx.ctx().channel().eventLoop()` to avoid race condition. [Code 
reference](https://github.com/apache/pulsar/blob/f70e52a700d3348dbdb2615495a2beb16c790f23/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2169)
 below:
   ```
   // If we have a connection, we schedule the callback and recycle on the 
event loop thread to avoid any
   // race condition since we also write the message on the socket from this 
thread
   cnx.ctx().channel().eventLoop().execute(() -> {
       synchronized (ProducerImpl.this) {
           failPendingMessages(null, ex);
       }
   });
   ```
   I think we can add `Lock()` and `Unlock()` as member method in 
`p.pendingQueue` and remove lock operation in all of implementation methods 
such as `Put()` or `Poll()`. We should regard "iterate every member and delete 
some of them" or "iterate every member and change some member value" as an 
atomic operation. We should call `p.pendingQueue.Lock()` or 
`p.pendingQueue.Unlock()` whenever we take some action to the `p.pendingQueue`.
   
   #### Steps to reproduce
   1. Update broker `handleSend()` function, always return a transient error 
when encountered some specific message. 
   ```
   @Override
   protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
       ...
       this.ctx().channel().eventLoop().execute(() -> {
        this.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, 
ServerError.UnknownError, "UnknownError"));
        this.completedSendOperation(producer.isNonPersistentTopic(), 0);
       });
   }
   ```
   2. Write a demo to send this specific message and this demo will reconnect 
infinitely. 
   ```
   client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "<url>:6650",
        Authentication: pulsar.NewAuthenticationToken("<token>"),
   })
   if err != nil {
        log.Fatal(err)
   }
   defer client.Close()
   producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic:       "<tenant>/<namespace>/<topic>",
   })
   if err != nil {
        log.Fatal(err)
   }
   defer producer.Close()
   ctx := context.Background()
   if msgId, err := producer.Send(
        ctx,
        &pulsar.ProducerMessage{
                Payload:      []byte("specificMessage"),
        },
   ); err != nil {
        log.Fatal(err)
   } else {
        log.Println("Published message: ", msgId)
   }
   ```
   Log will something like below and I use an `UnknownError` as a transient 
error
   ```
   INFO[0000] Reconnected producer to broker                
cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   WARN[0000] Received send error from server: [UnknownError] : [UnknownError]  
local_addr="<local_addr>:37696" remote_addr="pulsar://<pulsar_addr>:10003"
   WARN[0000] Connection was closed                         
cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   WARN[0000] Connection was closed                         
cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=2 
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
   INFO[0000] runEventsLoop will reconnect in producer      producerID=2 
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
   INFO[0000] Reconnecting to broker                        assignedBrokerURL= 
delayReconnectTime=115.401948ms producerID=2 producer_name=<tenant>-2-58692 
topic=<tenant>/<namespace>/<topic>-partition-0
   INFO[0000] runEventsLoop will reconnect in producer      producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] Reconnecting to broker                        assignedBrokerURL= 
delayReconnectTime=103.170835ms producerID=1 producer_name=<tenant>-2-58691 
topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] Connecting to broker                          
remote_addr="pulsar://<pulsar_addr>:10003"
   INFO[0000] TCP connection established                    
local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
   INFO[0000] Connection is ready                           
local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
   INFO[0000] Connected producer                            
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" epoch=3 producerID=2 
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
   INFO[0000] Reconnected producer to broker                
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=2 
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
   INFO[0000] Connected producer                            
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" epoch=3 producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] Resending 1 pending batches                   producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] Reconnected producer to broker                
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   WARN[0000] Received send error from server: [UnknownError] : [UnknownError]  
local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
   WARN[0000] Connection was closed                         
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=2 
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
   WARN[0000] Connection was closed                         
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] runEventsLoop will reconnect in producer      producerID=2 
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
   INFO[0000] Reconnecting to broker                        assignedBrokerURL= 
delayReconnectTime=112.652459ms producerID=2 producer_name=<tenant>-2-58692 
topic=<tenant>/<namespace>/<topic>-partition-0
   INFO[0000] runEventsLoop will reconnect in producer      producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] Reconnecting to broker                        assignedBrokerURL= 
delayReconnectTime=104.869345ms producerID=1 producer_name=<tenant>-2-58691 
topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] Connecting to broker                          
remote_addr="pulsar://<pulsar_addr>:10003"
   INFO[0000] TCP connection established                    
local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
   INFO[0000] Connection is ready                           
local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
   INFO[0000] Connected producer                            
cnx="<local_addr>:37712 -> <pulsar_addr>:10003" epoch=4 producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] Resending 1 pending batches                   producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   INFO[0000] Reconnected producer to broker                
cnx="<local_addr>:37712 -> <pulsar_addr>:10003" producerID=1 
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
   WARN[0000] Received send error from server: [UnknownError] : [UnknownError]  
local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
   ...
   ```
   
   #### System configuration
   **Client version**: 0.12.1
   **Broker version**: 2.9
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to