blutack opened a new issue #2521: Go client reader hang when processing backlog of ~6M messages URL: https://github.com/apache/incubator-pulsar/issues/2521 #### Expected behavior The reader should read all topic messages and then continue to follow the topic #### Actual behavior The reader reads approximately 230K messages and then the following call to Next blocks forever. #### Steps to reproduce I have a topic configured with infinite retention which I've loaded approx 6M ~60 byte messages into. This topic is fed with a new test message every second by a producer. I've use the following python code which gives me the result I would expect. ```python import pulsar, time client = pulsar.Client('pulsar://localhost:6650') reader = client.create_reader('positions', pulsar.MessageId.earliest) i = 0 while True: reader.read_next() i+=1 print(f"{time.time()}: {i}") ``` Which outputs this - all the backlog is read and then new messages as they come in: ``` <snip> 1536138110.039305: 6332371 1536138110.0393229: 6332372 1536138110.0393403: 6332373 1536138110.0393577: 6332374 1536138110.039375: 6332375 1536138110.0393927: 6332376 1536138110.114043: 6332377 1536138111.1111479: 6332378 1536138112.1161785: 6332379 1536138113.1167316: 6332380 1536138114.1186855: 6332381 1536138115.1202247: 6332382 ``` The problematic Go code is: ```golang package main import ( "log" "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar" ) func main() { log.SetFlags(log.LstdFlags | log.Lmicroseconds) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", OperationTimeoutSeconds: 120, }) if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } msgChannel := make(chan pulsar.ReaderMessage) reader, err := client.CreateReader(pulsar.ReaderOptions{ Topic: "positions", StartMessageID: pulsar.EarliestMessage, Name: "go_reader", MessageChannel: msgChannel, }) if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } defer reader.Close() i := 0 for cm := range msgChannel { i++ _ = cm.Message log.Printf("%d\n", i) } } ``` And outputs the following: note that most of the messages are not received and the process hangs. ``` <snip> 2018/09/05 10:08:04.674907 153036 2018/09/05 10:08:04.674931 153037 2018/09/05 10:08:04.674936 153038 2018/09/05 10:08:04.675011 153039 2018/09/05 10:08:04.675016 153040 2018/09/05 10:08:04.675044 153041 2018/09/05 10:08:04.675048 153042 2018/09/05 10:08:04.675070 153043 2018/09/05 10:08:04.675079 153044 2018/09/05 10:08:04.675102 153045 2018/09/05 10:08:04.675110 153046 2018/09/05 10:08:04.675137 153047 2018/09/05 10:08:04.675141 153048 <hangs here> ``` The number of messages received before hanging seems to depend on wall time - if I remove the log calls I get about 250K. When running the Go code I eventually get the following in my pulsar log: ``` 10:09:01.340 [pulsar-io-48-11] WARN org.apache.pulsar.common.api.PulsarHandler - [[id: 0xcb26b3aa, L:/127.0.0.1:6650 - R:/127.0.0.1:35682]] Forcing connection to close after keep-alive timeout 10:09:01.341 [pulsar-io-48-11] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:35682 10:09:01.342 [pulsar-io-48-12] WARN org.apache.pulsar.common.api.PulsarHandler - [[id: 0x894e4202, L:/127.0.0.1:6650 - R:/127.0.0.1:35684]] Forcing connection to close after keep-alive timeout 10:09:01.343 [pulsar-io-48-12] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:35684 10:09:01.343 [pulsar-io-48-12] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/positions, name=reader-4a7656}, consumerId=0, consumerName=go_reader, address=/127.0.0.1:35684} ``` Hope all this is helpful, I'll keep looking into it on my end but I'm unfamiliar with Go wrapping cpp so will be slow. #### System configuration **Pulsar version**: 2.1.0-incubating w/ standard standalone config and operating mode **OS**: Ubuntu 18.04 w/ Oracle Java 8
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
