#### Expected behavior

The reader should read all topic messages and then continue to follow the topic

#### Actual behavior

The reader reads approximately 230K messages and then the following call to 
Next blocks forever.

#### Steps to reproduce

I have a topic configured with infinite retention which I've loaded approx 6M 
~60 byte messages into. This topic is fed with a new test message every second 
by a producer.

I've use the following python code which gives me the result I would expect.
```python
import pulsar, time

client = pulsar.Client('pulsar://localhost:6650')
reader = client.create_reader('positions', pulsar.MessageId.earliest)

i = 0

while True:
    reader.read_next()
    i+=1
    print(f"{time.time()}: {i}")
```
Which outputs this  - all the backlog is read and then new messages as they 
come in:
```
<snip>
1536138110.039305: 6332371
1536138110.0393229: 6332372
1536138110.0393403: 6332373
1536138110.0393577: 6332374
1536138110.039375: 6332375
1536138110.0393927: 6332376
1536138110.114043: 6332377
1536138111.1111479: 6332378
1536138112.1161785: 6332379
1536138113.1167316: 6332380
1536138114.1186855: 6332381
1536138115.1202247: 6332382
```

The problematic Go code is:
```golang
package main

import (
        "log"

        "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
)

func main() {
        log.SetFlags(log.LstdFlags | log.Lmicroseconds)
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://localhost:6650",
                OperationTimeoutSeconds: 120,
        })

        if err != nil {
                log.Fatalf("Could not instantiate Pulsar client: %v", err)
        }

        msgChannel := make(chan pulsar.ReaderMessage)

        reader, err := client.CreateReader(pulsar.ReaderOptions{
                Topic:          "positions",
                StartMessageID: pulsar.EarliestMessage,
                Name:           "go_reader",
                MessageChannel: msgChannel,
        })

        if err != nil {
                log.Fatalf("Could not instantiate Pulsar client: %v", err)
        }

        defer reader.Close()

        i := 0
        for cm := range msgChannel {
                i++
                _ = cm.Message

                log.Printf("%d\n", i)
        }
}
```

And outputs the following: note that most of the messages are not received and 
the process hangs.
```
<snip>
2018/09/05 10:08:04.674907 153036
2018/09/05 10:08:04.674931 153037
2018/09/05 10:08:04.674936 153038
2018/09/05 10:08:04.675011 153039
2018/09/05 10:08:04.675016 153040
2018/09/05 10:08:04.675044 153041
2018/09/05 10:08:04.675048 153042
2018/09/05 10:08:04.675070 153043
2018/09/05 10:08:04.675079 153044
2018/09/05 10:08:04.675102 153045
2018/09/05 10:08:04.675110 153046
2018/09/05 10:08:04.675137 153047
2018/09/05 10:08:04.675141 153048
<hangs here>
```
The number of messages received before hanging seems to depend on wall time - 
if I remove the log calls I get about 250K.

When running the Go code I eventually get the following in my pulsar log:
```
10:09:01.340 [pulsar-io-48-11] WARN  org.apache.pulsar.common.api.PulsarHandler 
- [[id: 0xcb26b3aa, L:/127.0.0.1:6650 - R:/127.0.0.1:35682]] Forcing connection 
to close after keep-alive timeout
10:09:01.341 [pulsar-io-48-11] INFO  org.apache.pulsar.broker.service.ServerCnx 
- Closed connection from /127.0.0.1:35682
10:09:01.342 [pulsar-io-48-12] WARN  org.apache.pulsar.common.api.PulsarHandler 
- [[id: 0x894e4202, L:/127.0.0.1:6650 - R:/127.0.0.1:35684]] Forcing connection 
to close after keep-alive timeout
10:09:01.343 [pulsar-io-48-12] INFO  org.apache.pulsar.broker.service.ServerCnx 
- Closed connection from /127.0.0.1:35684
10:09:01.343 [pulsar-io-48-12] INFO  
org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - 
Removing consumer 
Consumer{subscription=PersistentSubscription{topic=persistent://public/default/positions,
 name=reader-4a7656}, consumerId=0, consumerName=go_reader, 
address=/127.0.0.1:35684}
```

Hope all this is helpful, I'll keep looking into it on my end but I'm 
unfamiliar with Go wrapping cpp so will be slow.

#### System configuration
**Pulsar version**: 2.1.0-incubating w/ standard standalone config and 
operating mode
**OS**: Ubuntu 18.04 w/ Oracle Java 8


[ Full content available at: 
https://github.com/apache/incubator-pulsar/issues/2521 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to