fracasula commented on issue #7682: URL: https://github.com/apache/pulsar/issues/7682#issuecomment-683914492
After further investigation I was able to come across two scenarios, which may be related. ## Silent broker scenario * all consumers are connected with PING and PONG responses travelling over the wire * the `stats` tool reports several messages in the `msgBacklog` for the given topic/subscription and it shows all the consumers as connected but with a `msgRateOut` of 0 * by looking at the [Pulsar binary protocol](https://pulsar.apache.org/docs/en/develop-binary-protocol/) I can say that everything goes smootly for all consumers, even if restarted, up to the `flow` stage and then hang forever waiting for a MESSAGE command that never comes, meaning they connect to the broker, send the 1000 permits and then just get pings from [here](https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L353), no messages whatsoever * killing the Pulsar proxy or restarting the consumers *do not help* * restarting the Brokers fix the issue * when trying with the official latest Python 2.6.1 client, it just dies as shown below ``` 2020-08-31 08:37:07.728 INFO [139696078108480] Client:88 | Subscribing on Topic :SpaceEvents 2020-08-31 08:37:07.728 INFO [139696078108480] ConnectionPool:85 | Created connection for pulsar://localhost:6650 2020-08-31 08:37:07.729 INFO [139696048822016] ClientConnection:343 | [[::1]:57112 -> [::1]:6650] Connected to broker 2020-08-31 08:37:08.286 INFO [139696048822016] HandlerBase:53 | [persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Getting connection from pool 2020-08-31 08:37:08.426 INFO [139696048822016] ConnectionPool:85 | Created connection for pulsar://10.56.3.23:6650 2020-08-31 08:37:08.427 INFO [139696048822016] ClientConnection:345 | [[::1]:57114 -> [::1]:6650] Connected to broker through proxy. Logical broker: pulsar://10.56.3.23:6650 2020-08-31 08:37:08.968 WARN [139696048822016] ClientConnection:947 | [[::1]:57114 -> [::1]:6650] Received error response from server: UnknownError -- req_id: 0 2020-08-31 08:37:08.968 ERROR [139696048822016] ConsumerImpl:242 | [persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Failed to create consumer: UnknownError Traceback (most recent call last): File "main.py", line 4, in <module> consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester') File "/home/francesco/.local/lib/python3.8/site-packages/pulsar/__init__.py", line 655, in subscribe c._consumer = self._client.subscribe(topic, subscription_name, conf) Exception: Pulsar error: UnknownError 2020-08-31 08:37:08.974 INFO [139696078108480] ClientConnection:1387 | [[::1]:57114 -> [::1]:6650] Connection closed 2020-08-31 08:37:08.974 INFO [139696078108480] ClientConnection:1387 | [[::1]:57112 -> [::1]:6650] Connection closed 2020-08-31 08:37:08.974 INFO [139696078108480] ClientConnection:238 | [[::1]:57114 -> [::1]:6650] Destroyed connection 2020-08-31 08:37:08.974 INFO [139696078108480] ClientConnection:238 | [[::1]:57112 -> [::1]:6650] Destroyed connection ``` This is the Python code I used: ```python import pulsar client = pulsar.Client('pulsar://localhost:6650') consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester') def consume(): while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except: # Message failed to be processed consumer.negative_acknowledge(msg) if __name__ == '__main__': consume() client.close() ``` And the `requirements.txt`: ``` pulsar-client==2.6.1 apache-bookkeeper-client==4.11.0 grpcio<1.26.0 ``` ## Possible acking deadlock (Golang client) * all consumers report PING and PONG responses in their logs (verbosity set to `trace`) * the `stats` tool reports no consumers at all despite all clients are print the PING/PONG successfully in their logs * the `stats` tool reports a `msgBacklog` greater than zero so there are messages waiting to be processed * by getting a full goroutine stack dump I was able to determine that all consumers are stuck [here](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L741) * to be 100% sure so that it wouldn't just be a case of having the service trying to ack every time I was getting the full goroutine stack dump, I made changes to the Golang client by adding a ticker that prints `Trying to ack` on the logs whenever it was taking more than 150ms The last bullet point means that I change this code: ```go func (pc *partitionConsumer) runEventsLoop() { defer func() { pc.log.Debug("exiting events loop") }() for { select { case <-pc.closeCh: return case i := <-pc.eventsCh: switch v := i.(type) { case *ackRequest: pc.internalAck(v) ``` Like this: ```go func (pc *partitionConsumer) runEventsLoop() { defer func() { pc.log.Debug("exiting events loop") }() for { select { case <-pc.closeCh: return case i := <-pc.eventsCh: switch v := i.(type) { case *ackRequest: ctx, cancel := context.WithCancel(context.Background()) go func(v *ackRequest) { for { select { case <-ctx.Done(): return default: pc.log.Infof("Trying to ack %+v (%d - %d)", v.msgID, len(pc.eventsCh), cap(pc.eventsCh)) time.Sleep(150 * time.Millisecond) } } }(v) pc.internalAck(v) cancel() ``` If the ack happens within 150ms we don't see any logs. The problem is that now I have all consumers stuck in a never ending loop just printing: ``` {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"} ``` This has been going on for several hours and the message ID of the log entry is always the same until I kill the consumer. Could it be that, given that the `eventsCh` is a buffered channel of 3, when a connection gets closed due to a [message frame size that is too big](https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection_reader.go#L78) then the `runEventsLoop()` never gets to process the `*connectionClosed` event due to at least 3 in-flight ack requests? Meaning: we could have 3 ack requests that are already keeping the channel full, the connection gets closed, the acks can't be processed because the connection was closed and we cannot [reconnect to the broker](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L753) because, due to the channel being full, we can't push `*connectionClosed` event into the `eventsCh` thus it never gets processed = deadlock? In support of this theory I can see this in the logs right before I get the never ending `Trying to ack` loop: ``` {"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed to write on connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"} {"level":"debug","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Write data: 25","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"DEBUG","time":"2020-08-31T17:47:23+02:00"} {"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed to write on connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"} {"level":"info","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Connection closed","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"INFO","time":"2020-08-31T17:47:23+02:00"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"} {"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"} ``` Also by analyzing the stack dump I can see that one of the 21 goroutines running is stuck waiting [here](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L567). ``` 1 @ 0x449d3b 0x41229f 0x412095 0xc5b3f5 0xc46394 0xc4ceb9 0xc41f5e 0xc4ccc6 0x47d571 # 0xc5b3f4 github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).ConnectionClosed+0x64 /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/consumer_partition.go:567 # 0xc46393 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close+0x583 /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:751 # 0xc4ceb8 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run.func1+0x168 /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:363 # 0xc41f5d github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run+0x2bd /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:369 # 0xc4ccc5 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1+0x85 /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:231 ``` Killing the consumer in this case helps but it eventually gets stuck again trying to ack some other message. We could potentially try to look into the second scenario but have no clue whatsoever about the first one and we have no Java expertise. Can someone please look a bit more into this and tell us whether you need more information? Thanks. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org