fracasula commented on issue #7682:
URL: https://github.com/apache/pulsar/issues/7682#issuecomment-683914492


   After further investigation I was able to come across two scenarios, which 
may be related.
   
   ## Silent broker scenario
   
   * all consumers are connected with PING and PONG responses travelling over 
the wire
   * the `stats` tool reports several messages in the `msgBacklog` for the 
given topic/subscription and it shows all the consumers as connected but with a 
`msgRateOut` of 0
   * by looking at the [Pulsar binary 
protocol](https://pulsar.apache.org/docs/en/develop-binary-protocol/) I can say 
that everything goes smootly for all consumers, even if restarted, up to the 
`flow` stage and then hang forever waiting for a MESSAGE command that never 
comes, meaning they connect to the broker, send the 1000 permits and then just 
get pings from 
[here](https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L353),
 no messages whatsoever
   * killing the Pulsar proxy or restarting the consumers *do not help*
   * restarting the Brokers fix the issue
   * when trying with the official latest Python 2.6.1 client, it just dies as 
shown below
   
   ```
   2020-08-31 08:37:07.728 INFO  [139696078108480] Client:88 | Subscribing on 
Topic :SpaceEvents
   2020-08-31 08:37:07.728 INFO  [139696078108480] ConnectionPool:85 | Created 
connection for pulsar://localhost:6650
   2020-08-31 08:37:07.729 INFO  [139696048822016] ClientConnection:343 | 
[[::1]:57112 -> [::1]:6650] Connected to broker
   2020-08-31 08:37:08.286 INFO  [139696048822016] HandlerBase:53 | 
[persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Getting 
connection from pool
   2020-08-31 08:37:08.426 INFO  [139696048822016] ConnectionPool:85 | Created 
connection for pulsar://10.56.3.23:6650
   2020-08-31 08:37:08.427 INFO  [139696048822016] ClientConnection:345 | 
[[::1]:57114 -> [::1]:6650] Connected to broker through proxy. Logical broker: 
pulsar://10.56.3.23:6650
   2020-08-31 08:37:08.968 WARN  [139696048822016] ClientConnection:947 | 
[[::1]:57114 -> [::1]:6650] Received error response from server: UnknownError 
-- req_id: 0
   2020-08-31 08:37:08.968 ERROR [139696048822016] ConsumerImpl:242 | 
[persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Failed to 
create consumer: UnknownError
   Traceback (most recent call last):
     File "main.py", line 4, in <module>
       consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester')
     File 
"/home/francesco/.local/lib/python3.8/site-packages/pulsar/__init__.py", line 
655, in subscribe
       c._consumer = self._client.subscribe(topic, subscription_name, conf)
   Exception: Pulsar error: UnknownError
   2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:1387 | 
[[::1]:57114 -> [::1]:6650] Connection closed
   2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:1387 | 
[[::1]:57112 -> [::1]:6650] Connection closed
   2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:238 | 
[[::1]:57114 -> [::1]:6650] Destroyed connection
   2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:238 | 
[[::1]:57112 -> [::1]:6650] Destroyed connection
   ```
   
   This is the Python code I used:
   
   ```python
   import pulsar
   
   client = pulsar.Client('pulsar://localhost:6650')
   consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester')
   
   
   def consume():
       while True:
           msg = consumer.receive()
           try:
               print("Received message '{}' id='{}'".format(msg.data(), 
msg.message_id()))
               # Acknowledge successful processing of the message
               consumer.acknowledge(msg)
           except:
               # Message failed to be processed
               consumer.negative_acknowledge(msg)
   
   
   if __name__ == '__main__':
       consume()
       client.close()
   ```
   
   And the `requirements.txt`:
   
   ```
   pulsar-client==2.6.1
   apache-bookkeeper-client==4.11.0
   grpcio<1.26.0
   ```
   
   ## Possible acking deadlock (Golang client)
   
   * all consumers report PING and PONG responses in their logs (verbosity set 
to `trace`)
   * the `stats` tool reports no consumers at all despite all clients are print 
the PING/PONG successfully in their logs
   * the `stats` tool reports a `msgBacklog` greater than zero so there are 
messages waiting to be processed
   * by getting a full goroutine stack dump I was able to determine that all 
consumers are stuck 
[here](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L741)
   * to be 100% sure so that it wouldn't just be a case of having the service 
trying to ack every time I was getting the full goroutine stack dump, I made 
changes to the Golang client by adding a ticker that prints `Trying to ack` on 
the logs whenever it was taking more than 150ms
   
   The last bullet point means that I change this code:
   
   ```go
   func (pc *partitionConsumer) runEventsLoop() {
        defer func() {
                pc.log.Debug("exiting events loop")
        }()
        for {
                select {
                case <-pc.closeCh:
                        return
                case i := <-pc.eventsCh:
                        switch v := i.(type) {
                        case *ackRequest:
                                pc.internalAck(v)
   ```
   
   Like this:
   
   ```go
   func (pc *partitionConsumer) runEventsLoop() {
        defer func() {
                pc.log.Debug("exiting events loop")
        }()
        for {
                select {
                case <-pc.closeCh:
                        return
                case i := <-pc.eventsCh:
                        switch v := i.(type) {
                        case *ackRequest:
                                ctx, cancel := 
context.WithCancel(context.Background())
                                go func(v *ackRequest) {
                                        for {
                                                select {
                                                case <-ctx.Done():
                                                        return
                                                default:
                                                        pc.log.Infof("Trying to 
ack %+v (%d - %d)", v.msgID, len(pc.eventsCh), cap(pc.eventsCh))
                                                        time.Sleep(150 * 
time.Millisecond)
                                                }
                                        }
                                }(v)
   
                                pc.internalAck(v)
                                cancel()
   ```
   
   If the ack happens within 150ms we don't see any logs. The problem is that 
now I have all consumers stuck in a never ending loop just printing:
   
   ```
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
   ```
   
   This has been going on for several hours and the message ID of the log entry 
is always the same until I kill the consumer. 
   
   Could it be that, given that the `eventsCh` is a buffered channel of 3, when 
a connection gets closed due to a [message frame size that is too 
big](https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection_reader.go#L78)
 then the `runEventsLoop()` never gets to process the `*connectionClosed` event 
due to at least 3 in-flight ack requests?
   
   Meaning: we could have 3 ack requests that are already keeping the channel 
full, the connection gets closed, the acks can't be processed because the 
connection was closed and we cannot [reconnect to the 
broker](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L753)
 because, due to the channel being full, we can't push `*connectionClosed` 
event into the `eventsCh` thus it never gets processed = deadlock?
   
   In support of this theory I can see this in the logs right before I get the 
never ending `Trying to ack` loop:
   
   ```
   {"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network 
connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed
 to write on 
connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"}
   
{"level":"debug","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Write
 data: 
25","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"DEBUG","time":"2020-08-31T17:47:23+02:00"}
   {"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network 
connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed
 to write on 
connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"}
   
{"level":"info","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Connection
 
closed","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"INFO","time":"2020-08-31T17:47:23+02:00"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
   {"consumerID":1,"level":"info","message":"Trying to ack 
{messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} 
tracker:\u003cnil\u003e consumer:0xc000336000 
receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 
3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
   ```
   
   Also by analyzing the stack dump I can see that one of the 21 goroutines 
running is stuck waiting 
[here](https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L567).
   
   ```
   1 @ 0x449d3b 0x41229f 0x412095 0xc5b3f5 0xc46394 0xc4ceb9 0xc41f5e 0xc4ccc6 
0x47d571
   #    0xc5b3f4        
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).ConnectionClosed+0x64
    
/home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/consumer_partition.go:567
   #    0xc46393        
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close+0x583    
        
/home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:751
   #    0xc4ceb8        
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run.func1+0x168
        
/home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:363
   #    0xc41f5d        
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run+0x2bd      
        
/home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:369
   #    0xc4ccc5        
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1+0x85
       
/home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:231
   ```
   
   Killing the consumer in this case helps but it eventually gets stuck again 
trying to ack some other message.
   
   We could potentially try to look into the second scenario but have no clue 
whatsoever about the first one and we have no Java expertise. Can someone 
please look a bit more into this and tell us whether you need more information? 
Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to