dferstay opened a new pull request #535: URL: https://github.com/apache/pulsar-client-go/pull/535
The partitionConsumer maintains a few internal go-routines, two of which access the underlying internal.Connection. The main runEvenstLoop() go-routine reads the connection field while a separate go-routine is used to detect connnection loss, initiate reconnection, and sets the connection. Previously, access to the conn field was not synchronized. Now, the conn field is read and written atomically; avoiding race conditions. Signed-off-by: Daniel Ferstay <[email protected]> ### Motivation While attempting to submit a separate PR (https://github.com/apache/pulsar-client-go/pull/534) I found that the `pulsar/reader_test` consistently failed with the following data race. This change in this PR is an attempt to fix it. ``` 2021-06-07T22:33:43.1825587Z ================== 2021-06-07T22:33:43.1825992Z WARNING: DATA RACE 2021-06-07T22:33:43.1826513Z Read at 0x00c0003de4a8 by goroutine 463: 2021-06-07T22:33:43.1828038Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).requestGetLastMessageID() 2021-06-07T22:33:43.1829418Z /pulsar-client-go/pulsar/consumer_partition.go:279 +0x27e 2021-06-07T22:33:43.1830873Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).internalGetLastMessageID() 2021-06-07T22:33:43.1832427Z /pulsar-client-go/pulsar/consumer_partition.go:270 +0xea 2021-06-07T22:33:43.1833799Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop() 2021-06-07T22:33:43.1835074Z /pulsar-client-go/pulsar/consumer_partition.go:806 +0x2cb 2021-06-07T22:33:43.1835559Z 2021-06-07T22:33:43.1836251Z Previous write at 0x00c0003de4a8 by goroutine 294: 2021-06-07T22:33:43.1837949Z time="2021-06-07T22:33:41Z" level=info msg="[Connected consumer]" consumerID=2 name= subscription=reader-kcnmq topic=my-topic-971826719 2021-06-07T22:33:43.1839441Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabConn() 2021-06-07T22:33:43.1841513Z /pulsar-client-go/pulsar/consumer_partition.go:974 +0x1875 2021-06-07T22:33:43.1842783Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).reconnectToBroker() 2021-06-07T22:33:43.1844507Z /pulsar-client-go/pulsar/consumer_partition.go:887 +0x2db 2021-06-07T22:33:43.1845873Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop.func2() 2021-06-07T22:33:43.1847183Z /pulsar-client-go/pulsar/consumer_partition.go:791 +0xbe 2021-06-07T22:33:43.1847678Z 2021-06-07T22:33:43.1848159Z Goroutine 463 (running) created at: 2021-06-07T22:33:43.1849312Z github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer() 2021-06-07T22:33:43.1850983Z /pulsar-client-go/pulsar/consumer_partition.go:208 +0xf46 2021-06-07T22:33:43.1852092Z github.com/apache/pulsar-client-go/pulsar.newReader() 2021-06-07T22:33:43.1853173Z /pulsar-client-go/pulsar/reader_impl.go:105 +0x8ab 2021-06-07T22:33:43.1854294Z github.com/apache/pulsar-client-go/pulsar.(*client).CreateReader() 2021-06-07T22:33:43.1855391Z /pulsar-client-go/pulsar/client_impl.go:170 +0xcb 2021-06-07T22:33:43.1857422Z github.com/apache/pulsar-client-go/pulsar.TestReaderLatestInclusiveHasNext() 2021-06-07T22:33:43.1858855Z /pulsar-client-go/pulsar/reader_test.go:587 +0x946 2021-06-07T22:33:43.1859491Z testing.tRunner() 2021-06-07T22:33:43.1860118Z /usr/local/go/src/testing/testing.go:909 +0x199 2021-06-07T22:33:43.1860498Z 2021-06-07T22:33:43.1860966Z Goroutine 294 (running) created at: 2021-06-07T22:33:43.1862630Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop() 2021-06-07T22:33:43.1864091Z /pulsar-client-go/pulsar/consumer_partition.go:784 +0x174 2021-06-07T22:33:43.1865119Z ================== ``` ### Modifications Store the internal.Connection managed for the partitionConsumer in an `atomic.Value`: https://golang.org/pkg/sync/atomic/#Value ### Verifying this change This change is already covered by existing tests that use `partitionConsumer` instances, such as `pulsar/reader_test`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
