milos-matijasevic opened a new issue #389: URL: https://github.com/apache/pulsar-client-go/issues/389
With regard to #376 and fixing deadlock, i think new problem occurs. My tests were focused on **consumer** and **out of order acknowledgments**. While consumer's client tries to reconnect ([pc.reconnectToBroker()](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L752)), for loop under that receives requests from `eventsCh` channel is not blocked, and it continues receiving request and tries to process them. Here ([internalAck rpc call](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L454)) a RPC call which returns error if connection is closed is called, and that error is not handled at all, so it returns like ack call is done successfully (even when it is not), and continues to process other requests. **_This generally applies to other requests and RPC calls which work in same way._** Here are some tests related to consumer ack. #### Steps to reproduce First produce some messages ```bash ./pulsar-client produce test -n 1000 -m test ``` Simple program that is used for testing: consumer that acks every 2nd message ```go package main import ( "fmt" "log" "time" "github.com/apache/pulsar-client-go/pulsar" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) if err != nil { log.Fatal(err) } defer client.Close() consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "test", SubscriptionName: "test-sub", SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, Type: pulsar.Failover, // ReceiverQueueSize: 1, // MessageChannel: make(chan pulsar.ConsumerMessage), }) if err != nil { log.Fatal(err) } defer consumer.Close() i := 0 for msg := range consumer.Chan() { fmt.Printf("%v\n", msg.ID()) time.Sleep(1 * time.Second) if i%2 == 0 { consumer.AckID(msg.ID()) } i++ } } ``` ### Scenario 1 **Start program, kill broker, and after kill program** When i kill broker, this program continues working: ``` INFO[0000] [Connecting to broker] remote_addr="pulsar://localhost:6650" INFO[0000] [TCP connection established] local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650" INFO[0000] [Connection is ready] local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650" INFO[0000] [Connected consumer] consumerID=1 name=htiei subscription=test-sub topic="persistent://public/default/test" INFO[0000] [Created consumer] consumerID=1 name=htiei subscription=test-sub topic="persistent://public/default/test" 2443:0:0 2443:1:0 2443:2:0 2443:3:0 INFO[0004] Broker notification of Closed consumer: [1] local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650" INFO[0004] [Reconnecting to broker in 100ms] consumerID=1 name=htiei subscription=test-sub topic="persistent://public/default/test" INFO[0004] [Error reading from connection] error="Short read when reading frame size: EOF" local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650" INFO[0004] [Connection closed] local_addr="127.0.0.1:62462" remote_addr="pulsar://localhost:6650" INFO[0004] [Connecting to broker] remote_addr="pulsar://localhost:6650" WARN[0004] [Failed to connect to broker.] error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650" INFO[0004] [Connection closed] remote_addr="pulsar://localhost:6650" 2443:4:0 2443:5:0 INFO[0006] [Connecting to broker] remote_addr="pulsar://localhost:6650" WARN[0006] [Failed to connect to broker.] error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650" INFO[0006] [Connection closed] remote_addr="pulsar://localhost:6650" 2443:6:0 2443:7:0 2443:8:0 2443:9:0 INFO[0010] [Connecting to broker] remote_addr="pulsar://localhost:6650" WARN[0010] [Failed to connect to broker.] error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650" INFO[0010] [Connection closed] remote_addr="pulsar://localhost:6650" 2443:10:0 2443:11:0 2443:12:0 2443:13:0 2443:14:0 2443:15:0 2443:16:0 2443:17:0 INFO[0018] [Connecting to broker] remote_addr="pulsar://localhost:6650" WARN[0018] [Failed to connect to broker.] error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650" INFO[0018] [Connection closed] remote_addr="pulsar://localhost:6650" 2443:18:0 ``` and cursor stats are: ``` "test-sub" : { "markDeletePosition" : "2443:0", "readPosition" : "2443:1", "waitingReadOp" : false, "pendingReadOps" : 0, "messagesConsumedCounter" : -2998, "cursorLedger" : -1, "cursorLedgerLastEntry" : -1, "individuallyDeletedMessages" : "[(2443:1..2443:2]]", "lastLedgerSwitchTimestamp" : "2020-11-12T17:03:08.648+01:00", "state" : "NoLedger", "numberOfEntriesSinceFirstNotAckedMessage" : 1, "totalNonContiguousDeletedMessagesRange" : 1, "properties" : { } } ``` ### Scenario 1 **Start program, kill broker, start broker so consumer reconnect, and after kill program** More strange thing happens if i let consumer to reconnect to client Reseting everything and running again: ``` INFO[0000] [Connecting to broker] remote_addr="pulsar://localhost:6650" INFO[0000] [TCP connection established] local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650" INFO[0000] [Connection is ready] local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650" INFO[0000] [Connected consumer] consumerID=1 name=wmlxt subscription=test-sub topic="persistent://public/default/test" INFO[0000] [Created consumer] consumerID=1 name=wmlxt subscription=test-sub topic="persistent://public/default/test" 2443:0:0 2443:1:0 2443:2:0 INFO[0003] Broker notification of Closed consumer: [1] local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650" INFO[0003] [Reconnecting to broker in 100ms] consumerID=1 name=wmlxt subscription=test-sub topic="persistent://public/default/test" INFO[0003] [Error reading from connection] error="Short read when reading frame size: EOF" local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650" INFO[0003] [Connection closed] local_addr="127.0.0.1:62721" remote_addr="pulsar://localhost:6650" INFO[0003] [Connecting to broker] remote_addr="pulsar://localhost:6650" WARN[0003] [Failed to connect to broker.] error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650" INFO[0003] [Connection closed] remote_addr="pulsar://localhost:6650" 2443:3:0 2443:4:0 INFO[0005] [Connecting to broker] remote_addr="pulsar://localhost:6650" WARN[0005] [Failed to connect to broker.] error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650" INFO[0005] [Connection closed] remote_addr="pulsar://localhost:6650" 2443:5:0 2443:6:0 2443:7:0 2443:8:0 INFO[0009] [Connecting to broker] remote_addr="pulsar://localhost:6650" WARN[0009] [Failed to connect to broker.] error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650" INFO[0009] [Connection closed] remote_addr="pulsar://localhost:6650" 2443:9:0 2443:10:0 2443:11:0 2443:12:0 2443:13:0 2443:14:0 2443:15:0 2443:16:0 INFO[0017] [Connecting to broker] remote_addr="pulsar://localhost:6650" WARN[0017] [Failed to connect to broker.] error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650" INFO[0017] [Connection closed] remote_addr="pulsar://localhost:6650" 2443:17:0 2443:18:0 2443:19:0 2443:20:0 2443:21:0 2443:22:0 2443:23:0 2443:24:0 2443:25:0 2443:26:0 2443:27:0 2443:28:0 2443:29:0 2443:30:0 2443:31:0 2443:32:0 INFO[0033] [Connecting to broker] remote_addr="pulsar://localhost:6650" INFO[0033] [TCP connection established] local_addr="127.0.0.1:62772" remote_addr="pulsar://localhost:6650" INFO[0033] [Connection is ready] local_addr="127.0.0.1:62772" remote_addr="pulsar://localhost:6650" INFO[0033] [Connected consumer] consumerID=1 name=wmlxt subscription=test-sub topic="persistent://public/default/test" INFO[0033] [Reconnected consumer to broker] consumerID=1 name=wmlxt subscription=test-sub topic="persistent://public/default/test" 2443:33:0 2443:34:0 2443:35:0 2443:36:0 ^Csignal: interrupt ``` Cursors stats are like 43th message is acked?!?!? even if program didn't even reach that message, also all messages before are acked even if every 2nd is not ``` "test-sub" : { "markDeletePosition" : "2443:43", "readPosition" : "2443:1002", "waitingReadOp" : false, "pendingReadOps" : 0, "messagesConsumedCounter" : -2956, "cursorLedger" : 2559, "cursorLedgerLastEntry" : 1, "individuallyDeletedMessages" : "[]", "lastLedgerSwitchTimestamp" : "2020-11-12T17:07:39.571+01:00", "state" : "Open", "numberOfEntriesSinceFirstNotAckedMessage" : 959, "totalNonContiguousDeletedMessagesRange" : 0, "properties" : { } } } ``` if you try with uncommented ConsumerOptions, and same scenarios also strange things will happen, consumer will start reading messages already consumed and not acked. #### System configuration **Pulsar version**: v0.3.0-candidate-1 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
