flowchartsman commented on issue #448:
URL: 
https://github.com/apache/pulsar-client-go/issues/448#issuecomment-766529628


   Preliminary testing with `go.uber.org/atomic` indicates that this indeed 
eliminates the race in the referenced test.
   
   ```diff
   diff --git a/go.mod b/go.mod
   index bf0b627..2bb071f 100644
   --- a/go.mod
   +++ b/go.mod
   @@ -23,6 +23,7 @@ require (
        github.com/spf13/pflag v1.0.3 // indirect
        github.com/stretchr/testify v1.4.0
        github.com/yahoo/athenz v1.8.55
   +    go.uber.org/atomic v1.7.0
    )
    
    replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
   diff --git a/go.sum b/go.sum
   index eaf7163..637ce54 100644
   --- a/go.sum
   +++ b/go.sum
   @@ -154,6 +154,8 @@ github.com/stretchr/testify v1.4.0 
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
    github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
    github.com/yahoo/athenz v1.8.55 
h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
    github.com/yahoo/athenz v1.8.55/go.mod 
h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
   +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
   +go.uber.org/atomic v1.7.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
    golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
    golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
    golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 
h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
   diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
   index 285cf29..54d2c5a 100644
   --- a/pulsar/consumer_partition.go
   +++ b/pulsar/consumer_partition.go
   @@ -24,6 +24,7 @@ import (
        "time"
    
        "github.com/gogo/protobuf/proto"
   +    "go.uber.org/atomic"
    
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
   @@ -31,9 +32,7 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/log"
    )
    
   -var (
   -    lastestMessageID = LatestMessageID()
   -)
   +var lastestMessageID = LatestMessageID()
    
    type consumerState int
    
   @@ -86,7 +85,7 @@ type partitionConsumer struct {
    
        // this is needed for sending ConsumerMessage on the messageCh
        parentConsumer Consumer
   -    state          consumerState
   +    state          atomic.Int32
        options        *partitionConsumerOpts
    
        conn internal.Connection
   @@ -127,7 +126,6 @@ func newPartitionConsumer(parent Consumer, client 
*client, options *partitionCon
        messageCh chan ConsumerMessage, dlq *dlqRouter,
        metrics *internal.TopicMetrics) (*partitionConsumer, error) {
        pc := &partitionConsumer{
   -            state:                consumerInit,
                parentConsumer:       parent,
                client:               client,
                options:              options,
   @@ -148,6 +146,7 @@ func newPartitionConsumer(parent Consumer, client 
*client, options *partitionCon
                dlq:                  dlq,
                metrics:              metrics,
        }
   +    pc.setState(consumerInit)
        pc.log = client.log.SubLogger(log.Fields{
                "name":         pc.name,
                "topic":        options.topic,
   @@ -159,14 +158,16 @@ func newPartitionConsumer(parent Consumer, client 
*client, options *partitionCon
        err := pc.grabConn()
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
   +            pc.nackTracker.Close()
                return nil, err
        }
        pc.log.Info("Created consumer")
   -    pc.state = consumerReady
   +    pc.setState(consumerReady)
    
        if pc.options.startMessageIDInclusive && pc.startMessageID == 
lastestMessageID {
                msgID, err := pc.requestGetLastMessageID()
                if err != nil {
   +                    pc.nackTracker.Close()
                        return nil, err
                }
                if msgID.entryID != noMessageEntry {
   @@ -174,6 +175,7 @@ func newPartitionConsumer(parent Consumer, client 
*client, options *partitionCon
    
                        err = pc.requestSeek(msgID.messageID)
                        if err != nil {
   +                            pc.nackTracker.Close()
                                return nil, err
                        }
                }
   @@ -198,12 +200,13 @@ func (pc *partitionConsumer) Unsubscribe() error {
    func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) 
{
        defer close(unsub.doneCh)
    
   -    if pc.state == consumerClosed || pc.state == consumerClosing {
   +    pstate := pc.getState()
   +    if pstate == consumerClosed || pstate == consumerClosing {
                pc.log.Error("Failed to unsubscribe consumer, the consumer is 
closing or consumer has been closed")
                return
        }
    
   -    pc.state = consumerClosing
   +    pc.setState(consumerClosing)
        requestID := pc.client.rpcClient.NewRequestID()
        cmdUnsubscribe := &pb.CommandUnsubscribe{
                RequestId:  proto.Uint64(requestID),
   @@ -214,7 +217,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*unsubscribeRequest) {
                pc.log.WithError(err).Error("Failed to unsubscribe consumer")
                unsub.err = err
                // Set the state to ready for closing the consumer
   -            pc.state = consumerReady
   +            pc.setState(consumerReady)
                // Should'nt remove the consumer handler
                return
        }
   @@ -224,7 +227,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*unsubscribeRequest) {
                pc.nackTracker.Close()
        }
        pc.log.Infof("The consumer[%d] successfully unsubscribed", 
pc.consumerID)
   -    pc.state = consumerClosed
   +    pc.setState(consumerClosed)
    }
    
    func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
   @@ -305,7 +308,7 @@ func (pc *partitionConsumer) internalRedeliver(req 
*redeliveryRequest) {
    }
    
    func (pc *partitionConsumer) Close() {
   -    if pc.state != consumerReady {
   +    if pc.getState() != consumerReady {
                return
        }
    
   @@ -334,7 +337,8 @@ func (pc *partitionConsumer) internalSeek(seek 
*seekRequest) {
    }
    
    func (pc *partitionConsumer) requestSeek(msgID messageID) error {
   -    if pc.state == consumerClosing || pc.state == consumerClosed {
   +    pstate := pc.getState()
   +    if pstate == consumerClosing || pstate == consumerClosed {
                pc.log.Error("Consumer was already closed")
                return nil
        }
   @@ -376,7 +380,8 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) 
error {
    func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
        defer close(seek.doneCh)
    
   -    if pc.state == consumerClosing || pc.state == consumerClosed {
   +    pstate := pc.getState()
   +    if pstate == consumerClosing || pstate == consumerClosed {
                pc.log.Error("Consumer was already closed")
                return
        }
   @@ -738,11 +743,16 @@ func (pc *partitionConsumer) runEventsLoop() {
    
    func (pc *partitionConsumer) internalClose(req *closeRequest) {
        defer close(req.doneCh)
   -    if pc.state != consumerReady {
   +    pstate := pc.getState()
   +    if pstate != consumerReady {
   +            // this might be redundant but to ensure nack tracker is closed
   +            if pc.nackTracker != nil {
   +                    pc.nackTracker.Close()
   +            }
                return
        }
    
   -    if pc.state == consumerClosed || pc.state == consumerClosing {
   +    if pstate == consumerClosed || pstate == consumerClosing {
                pc.log.Error("The consumer is closing or has been closed")
                if pc.nackTracker != nil {
                        pc.nackTracker.Close()
   @@ -750,7 +760,7 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
                return
        }
    
   -    pc.state = consumerClosing
   +    pc.setState(consumerClosing)
        pc.log.Infof("Closing consumer=%d", pc.consumerID)
    
        requestID := pc.client.rpcClient.NewRequestID()
   @@ -769,7 +779,7 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
                provider.Close()
        }
    
   -    pc.state = consumerClosed
   +    pc.setState(consumerClosed)
        pc.conn.DeleteConsumeHandler(pc.consumerID)
        if pc.nackTracker != nil {
                pc.nackTracker.Close()
   @@ -790,7 +800,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
        }
    
        for maxRetry != 0 {
   -            if pc.state != consumerReady {
   +            if pc.getState() != consumerReady {
                        // Consumer is already closing
                        return
                }
   @@ -876,7 +886,6 @@ func (pc *partitionConsumer) grabConn() error {
    
        res, err := pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
                pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
   -
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
                return err
   @@ -908,7 +917,7 @@ func (pc *partitionConsumer) grabConn() error {
    }
    
    func (pc *partitionConsumer) clearQueueAndGetNextMessage() 
trackingMessageID {
   -    if pc.state != consumerReady {
   +    if pc.getState() != consumerReady {
                return trackingMessageID{}
        }
        wg := &sync.WaitGroup{}
   @@ -1028,6 +1037,14 @@ func (pc *partitionConsumer) 
discardCorruptedMessage(msgID *pb.MessageIdData,
                })
    }
    
   +func (pc *partitionConsumer) getState() consumerState {
   +    return consumerState(pc.state.Load())
   +}
   +
   +func (pc *partitionConsumer) setState(state consumerState) {
   +    pc.state.Store(int32(state))
   +}
   +
    func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
        if msgID.Undefined() {
                return nil
   ```
   
   Note: diff is merely illustrative of the approach of using this particular 
package for the fix. Other changes, like the nacktracker change have entered 
the main branch since the changes I made for #432, and are not germane to the 
issue.
   
   I haven't determined if this problem is specific to the regex consumer or if 
this is a deeper issue with the module, though it seems more likely that it's 
the former since all tests seem to be run with the race detector on. 
   
   If this is the only member accessed like this, it may be an acceptable fix 
to use `go.uber.org/atomic` since it provides a convenient interface and the 
change is relatively straightforward, however while writing the tests that 
caused this I did notice that there's a lack of coordination between the regex 
consumer and its constituent subconsumers. For the most part, it simply spins 
them up and only removes them if their topics go away. There's no way for the 
regexp consumer to know that one of its consumers has died, much less that it 
died for a non-retryable reason (such as its topic going away in this case). A 
potential fix for this might involve an event channel between the consumers and 
the regexp consumer and probably some fixes to error introspection like I 
mentioned 
[here](https://github.com/apache/pulsar-client-go/issues/432#issuecomment-754093889),
 since currently the only way I can see to do it is to inspect the error string 
itself for the condition, and that smells.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to