milos-matijasevic opened a new issue #389:
URL: https://github.com/apache/pulsar-client-go/issues/389


   With regard to #376 and fixing deadlock, i think new problem occurs.
   My tests were focused on **consumer** and **out of order acknowledgments**.
   While consumer's client tries to reconnect 
([pc.reconnectToBroker()](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L752)),
 for loop under that receives requests from `eventsCh` channel is not blocked, 
and it continues receiving request and tries to process them.
   Here ([internalAck rpc 
call](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L454))
 a RPC call which returns error if connection is closed is called, and that 
error is not handled at all, so it returns like ack call is done successfully 
(even when it is not), and continues to process other requests.
   
   **_This generally applies to other requests and RPC calls which work in same 
way._**
   
   Here are some tests related to consumer ack.
   
   #### Steps to reproduce
   
   First produce some messages
   ```bash
   ./pulsar-client produce test -n 1000 -m test
   ```
   Simple program that is used for testing: consumer that acks every 2nd message
   ```go
   package main
   
   import (
        "fmt"
        "log"
        "time"
   
        "github.com/apache/pulsar-client-go/pulsar"
   )
   
   func main() {
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL:               "pulsar://localhost:6650",
                OperationTimeout:  30 * time.Second,
                ConnectionTimeout: 30 * time.Second,
        })
        if err != nil {
                log.Fatal(err)
        }
        defer client.Close()
   
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                Topic:                       "test",
                SubscriptionName:            "test-sub",
                SubscriptionInitialPosition: 
pulsar.SubscriptionPositionEarliest,
                Type:                        pulsar.Failover,
                // ReceiverQueueSize:           1,
                // MessageChannel:              make(chan 
pulsar.ConsumerMessage),
        })
        if err != nil {
                log.Fatal(err)
        }
        defer consumer.Close()
   
        i := 0
        for msg := range consumer.Chan() {
                fmt.Printf("%v\n", msg.ID())
                time.Sleep(1 * time.Second)
   
                if i%2 == 0 {
                        consumer.AckID(msg.ID())
                }
                i++
        }
   }
   
   ```
   ### Scenario 1
   **Start program, kill broker, and after kill program**
   When i kill broker, this program continues working:
   ```
   INFO[0000] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   INFO[0000] [TCP connection established]                  
local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650"
   INFO[0000] [Connection is ready]                         
local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650"
   INFO[0000] [Connected consumer]                          consumerID=1 
name=htiei subscription=test-sub topic="persistent://public/default/test"
   INFO[0000] [Created consumer]                            consumerID=1 
name=htiei subscription=test-sub topic="persistent://public/default/test"
   2443:0:0
   2443:1:0
   2443:2:0
   2443:3:0
   INFO[0004] Broker notification of Closed consumer: [1]   
local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650"
   INFO[0004] [Reconnecting to broker in  100ms]            consumerID=1 
name=htiei subscription=test-sub topic="persistent://public/default/test"
   INFO[0004] [Error reading from connection]               error="Short read 
when reading frame size: EOF" local_addr="127.0.0.1:62462" 
remote_addr="pulsar://localhost:6650"
   INFO[0004] [Connection closed]                           
local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650"
   INFO[0004] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[0004] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[0004] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   2443:4:0
   2443:5:0
   INFO[0006] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[0006] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[0006] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   2443:6:0
   2443:7:0
   2443:8:0
   2443:9:0
   INFO[0010] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[0010] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[0010] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   2443:10:0
   2443:11:0
   2443:12:0
   2443:13:0
   2443:14:0
   2443:15:0
   2443:16:0
   2443:17:0
   INFO[0018] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[0018] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[0018] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   2443:18:0
   ```
   and cursor stats are:
   ```
   "test-sub" : {
         "markDeletePosition" : "2443:0",
         "readPosition" : "2443:1",
         "waitingReadOp" : false,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : -2998,
         "cursorLedger" : -1,
         "cursorLedgerLastEntry" : -1,
         "individuallyDeletedMessages" : "[(2443:1..2443:2]]",
         "lastLedgerSwitchTimestamp" : "2020-11-12T17:03:08.648+01:00",
         "state" : "NoLedger",
         "numberOfEntriesSinceFirstNotAckedMessage" : 1,
         "totalNonContiguousDeletedMessagesRange" : 1,
         "properties" : { }
       }
   ```
   ### Scenario 1
   **Start program, kill broker, start broker so consumer reconnect, and after 
kill program**
   More strange thing happens if i let consumer to reconnect to client
   Reseting everything and running again:
   ```
   INFO[0000] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   INFO[0000] [TCP connection established]                  
local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650"
   INFO[0000] [Connection is ready]                         
local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650"
   INFO[0000] [Connected consumer]                          consumerID=1 
name=wmlxt subscription=test-sub topic="persistent://public/default/test"
   INFO[0000] [Created consumer]                            consumerID=1 
name=wmlxt subscription=test-sub topic="persistent://public/default/test"
   2443:0:0
   2443:1:0
   2443:2:0
   INFO[0003] Broker notification of Closed consumer: [1]   
local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650"
   INFO[0003] [Reconnecting to broker in  100ms]            consumerID=1 
name=wmlxt subscription=test-sub topic="persistent://public/default/test"
   INFO[0003] [Error reading from connection]               error="Short read 
when reading frame size: EOF" local_addr="127.0.0.1:62721" 
remote_addr="pulsar://localhost:6650"
   INFO[0003] [Connection closed]                           
local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650"
   INFO[0003] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[0003] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[0003] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   2443:3:0
   2443:4:0
   INFO[0005] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[0005] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[0005] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   2443:5:0
   2443:6:0
   2443:7:0
   2443:8:0
   INFO[0009] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[0009] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[0009] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   2443:9:0
   2443:10:0
   2443:11:0
   2443:12:0
   2443:13:0
   2443:14:0
   2443:15:0
   2443:16:0
   INFO[0017] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[0017] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[0017] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   2443:17:0
   2443:18:0
   2443:19:0
   2443:20:0
   2443:21:0
   2443:22:0
   2443:23:0
   2443:24:0
   2443:25:0
   2443:26:0
   2443:27:0
   2443:28:0
   2443:29:0
   2443:30:0
   2443:31:0
   2443:32:0
   INFO[0033] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   INFO[0033] [TCP connection established]                  
local_addr="127.0.0.1:62772" remote_addr="pulsar://localhost:6650"
   INFO[0033] [Connection is ready]                         
local_addr="127.0.0.1:62772" remote_addr="pulsar://localhost:6650"
   INFO[0033] [Connected consumer]                          consumerID=1 
name=wmlxt subscription=test-sub topic="persistent://public/default/test"
   INFO[0033] [Reconnected consumer to broker]              consumerID=1 
name=wmlxt subscription=test-sub topic="persistent://public/default/test"
   2443:33:0
   2443:34:0
   2443:35:0
   2443:36:0
   ^Csignal: interrupt
   ```
   Cursors stats are like 43th message is acked?!?!? even if program didn't 
even reach that message, also all messages before are acked even if every 2nd 
is not
   ```
   "test-sub" : {
         "markDeletePosition" : "2443:43",
         "readPosition" : "2443:1002",
         "waitingReadOp" : false,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : -2956,
         "cursorLedger" : 2559,
         "cursorLedgerLastEntry" : 1,
         "individuallyDeletedMessages" : "[]",
         "lastLedgerSwitchTimestamp" : "2020-11-12T17:07:39.571+01:00",
         "state" : "Open",
         "numberOfEntriesSinceFirstNotAckedMessage" : 959,
         "totalNonContiguousDeletedMessagesRange" : 0,
         "properties" : { }
       }
     }
   ```
   
   if you try with uncommented ConsumerOptions, and same scenarios also strange 
things will happen, consumer will start reading messages already consumed and 
not acked.
   
   #### System configuration
   **Pulsar version**: v0.3.0-candidate-1
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to