blutack opened a new issue #2521: Go client reader hang when processing backlog 
of ~6M messages
URL: https://github.com/apache/incubator-pulsar/issues/2521
 
 
   #### Expected behavior
   
   The reader should read all topic messages and then continue to follow the 
topic
   
   #### Actual behavior
   
   The reader reads approximately 230K messages and then the following call to 
Next blocks forever.
   
   #### Steps to reproduce
   
   I have a topic configured with infinite retention which I've loaded approx 
6M ~60 byte messages into. This topic is fed with a new test message every 
second by a producer.
   
   I've use the following python code which gives me the result I would expect.
   ```python
   import pulsar, time
   
   client = pulsar.Client('pulsar://localhost:6650')
   reader = client.create_reader('positions', pulsar.MessageId.earliest)
   
   i = 0
   
   while True:
       reader.read_next()
       i+=1
       print(f"{time.time()}: {i}")
   ```
   Which outputs this  - all the backlog is read and then new messages as they 
come in:
   ```
   <snip>
   1536138110.039305: 6332371
   1536138110.0393229: 6332372
   1536138110.0393403: 6332373
   1536138110.0393577: 6332374
   1536138110.039375: 6332375
   1536138110.0393927: 6332376
   1536138110.114043: 6332377
   1536138111.1111479: 6332378
   1536138112.1161785: 6332379
   1536138113.1167316: 6332380
   1536138114.1186855: 6332381
   1536138115.1202247: 6332382
   ```
   
   The problematic Go code is:
   ```golang
   package main
   
   import (
        "log"
   
        "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
        log.SetFlags(log.LstdFlags | log.Lmicroseconds)
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://localhost:6650",
                OperationTimeoutSeconds: 120,
        })
   
        if err != nil {
                log.Fatalf("Could not instantiate Pulsar client: %v", err)
        }
   
        msgChannel := make(chan pulsar.ReaderMessage)
   
        reader, err := client.CreateReader(pulsar.ReaderOptions{
                Topic:          "positions",
                StartMessageID: pulsar.EarliestMessage,
                Name:           "go_reader",
                MessageChannel: msgChannel,
        })
   
        if err != nil {
                log.Fatalf("Could not instantiate Pulsar client: %v", err)
        }
   
        defer reader.Close()
   
        i := 0
        for cm := range msgChannel {
                i++
                _ = cm.Message
   
                log.Printf("%d\n", i)
        }
   }
   ```
   
   And outputs the following: note that most of the messages are not received 
and the process hangs.
   ```
   <snip>
   2018/09/05 10:08:04.674907 153036
   2018/09/05 10:08:04.674931 153037
   2018/09/05 10:08:04.674936 153038
   2018/09/05 10:08:04.675011 153039
   2018/09/05 10:08:04.675016 153040
   2018/09/05 10:08:04.675044 153041
   2018/09/05 10:08:04.675048 153042
   2018/09/05 10:08:04.675070 153043
   2018/09/05 10:08:04.675079 153044
   2018/09/05 10:08:04.675102 153045
   2018/09/05 10:08:04.675110 153046
   2018/09/05 10:08:04.675137 153047
   2018/09/05 10:08:04.675141 153048
   <hangs here>
   ```
   The number of messages received before hanging seems to depend on wall time 
- if I remove the log calls I get about 250K.
   
   When running the Go code I eventually get the following in my pulsar log:
   ```
   10:09:01.340 [pulsar-io-48-11] WARN  
org.apache.pulsar.common.api.PulsarHandler - [[id: 0xcb26b3aa, 
L:/127.0.0.1:6650 - R:/127.0.0.1:35682]] Forcing connection to close after 
keep-alive timeout
   10:09:01.341 [pulsar-io-48-11] INFO  
org.apache.pulsar.broker.service.ServerCnx - Closed connection from 
/127.0.0.1:35682
   10:09:01.342 [pulsar-io-48-12] WARN  
org.apache.pulsar.common.api.PulsarHandler - [[id: 0x894e4202, 
L:/127.0.0.1:6650 - R:/127.0.0.1:35684]] Forcing connection to close after 
keep-alive timeout
   10:09:01.343 [pulsar-io-48-12] INFO  
org.apache.pulsar.broker.service.ServerCnx - Closed connection from 
/127.0.0.1:35684
   10:09:01.343 [pulsar-io-48-12] INFO  
org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - 
Removing consumer 
Consumer{subscription=PersistentSubscription{topic=persistent://public/default/positions,
 name=reader-4a7656}, consumerId=0, consumerName=go_reader, 
address=/127.0.0.1:35684}
   ```
   
   Hope all this is helpful, I'll keep looking into it on my end but I'm 
unfamiliar with Go wrapping cpp so will be slow.
   
   #### System configuration
   **Pulsar version**: 2.1.0-incubating w/ standard standalone config and 
operating mode
   **OS**: Ubuntu 18.04 w/ Oracle Java 8
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to