dferstay opened a new pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535


   The partitionConsumer maintains a few internal go-routines, two of which
   access the underlying internal.Connection.  The main runEvenstLoop()
   go-routine reads the connection field while a separate go-routine is used
   to detect connnection loss, initiate reconnection, and sets the connection.
   
   Previously, access to the conn field was not synchronized.
   
   Now, the conn field is read and written atomically; avoiding race
   conditions.
   
   Signed-off-by: Daniel Ferstay <[email protected]>
   
   ### Motivation
   
   While attempting to submit a separate PR 
(https://github.com/apache/pulsar-client-go/pull/534) I found that the 
`pulsar/reader_test` consistently failed with the following data race.  This 
change in this PR is an attempt to fix it. 
   
   ```
   2021-06-07T22:33:43.1825587Z ==================
   2021-06-07T22:33:43.1825992Z WARNING: DATA RACE
   2021-06-07T22:33:43.1826513Z Read at 0x00c0003de4a8 by goroutine 463:
   2021-06-07T22:33:43.1828038Z   
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).requestGetLastMessageID()
   2021-06-07T22:33:43.1829418Z       
/pulsar-client-go/pulsar/consumer_partition.go:279 +0x27e
   2021-06-07T22:33:43.1830873Z   
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).internalGetLastMessageID()
   2021-06-07T22:33:43.1832427Z       
/pulsar-client-go/pulsar/consumer_partition.go:270 +0xea
   2021-06-07T22:33:43.1833799Z   
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
   2021-06-07T22:33:43.1835074Z       
/pulsar-client-go/pulsar/consumer_partition.go:806 +0x2cb
   2021-06-07T22:33:43.1835559Z 
   2021-06-07T22:33:43.1836251Z Previous write at 0x00c0003de4a8 by goroutine 
294:
   2021-06-07T22:33:43.1837949Z time="2021-06-07T22:33:41Z" level=info 
msg="[Connected consumer]" consumerID=2 name= subscription=reader-kcnmq 
topic=my-topic-971826719
   2021-06-07T22:33:43.1839441Z   
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabConn()
   2021-06-07T22:33:43.1841513Z       
/pulsar-client-go/pulsar/consumer_partition.go:974 +0x1875
   2021-06-07T22:33:43.1842783Z   
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).reconnectToBroker()
   2021-06-07T22:33:43.1844507Z       
/pulsar-client-go/pulsar/consumer_partition.go:887 +0x2db
   2021-06-07T22:33:43.1845873Z   
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop.func2()
   2021-06-07T22:33:43.1847183Z       
/pulsar-client-go/pulsar/consumer_partition.go:791 +0xbe
   2021-06-07T22:33:43.1847678Z 
   2021-06-07T22:33:43.1848159Z Goroutine 463 (running) created at:
   2021-06-07T22:33:43.1849312Z   
github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
   2021-06-07T22:33:43.1850983Z       
/pulsar-client-go/pulsar/consumer_partition.go:208 +0xf46
   2021-06-07T22:33:43.1852092Z   
github.com/apache/pulsar-client-go/pulsar.newReader()
   2021-06-07T22:33:43.1853173Z       
/pulsar-client-go/pulsar/reader_impl.go:105 +0x8ab
   2021-06-07T22:33:43.1854294Z   
github.com/apache/pulsar-client-go/pulsar.(*client).CreateReader()
   2021-06-07T22:33:43.1855391Z       
/pulsar-client-go/pulsar/client_impl.go:170 +0xcb
   2021-06-07T22:33:43.1857422Z   
github.com/apache/pulsar-client-go/pulsar.TestReaderLatestInclusiveHasNext()
   2021-06-07T22:33:43.1858855Z       
/pulsar-client-go/pulsar/reader_test.go:587 +0x946
   2021-06-07T22:33:43.1859491Z   testing.tRunner()
   2021-06-07T22:33:43.1860118Z       /usr/local/go/src/testing/testing.go:909 
+0x199
   2021-06-07T22:33:43.1860498Z 
   2021-06-07T22:33:43.1860966Z Goroutine 294 (running) created at:
   2021-06-07T22:33:43.1862630Z   
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
   2021-06-07T22:33:43.1864091Z       
/pulsar-client-go/pulsar/consumer_partition.go:784 +0x174
   2021-06-07T22:33:43.1865119Z ==================
   ```
   ### Modifications
   
   Store the internal.Connection managed for the partitionConsumer in an 
`atomic.Value`: https://golang.org/pkg/sync/atomic/#Value
   
   ### Verifying this change
   
   This change is already covered by existing tests that use 
`partitionConsumer` instances, such as `pulsar/reader_test`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to