This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a7adcb9 Golang fix data races (#428)
3a7adcb9 is described below

commit 3a7adcb942b9e9410a12fe475aa11ae758636077
Author: PaweÅ‚ Biegun <[email protected]>
AuthorDate: Thu Mar 30 10:15:08 2023 +0200

    Golang fix data races (#428)
    
    * fix memory safety in rpc client
    
    * use atmoic bool instead of bool in DefaultClientMeter
    
    * use atomic bool as validateMessageType in producer options
    
    * use atomic into for maxBodySizeBytes in producerOptions
    
    * fix data race for seconds in message id codec
    
    * fix type mismatch in publishing_message.go
    
    * verify that there is no data race in defaultClientMeterProvider.reset
    
    * add license to metric_test.go
---
 golang/message_id_codec.go   | 10 ++++++----
 golang/metric.go             | 14 ++++++++------
 golang/metric_test.go        | 36 ++++++++++++++++++++++++++++++++++++
 golang/producer.go           |  6 ++++--
 golang/producer_options.go   | 12 +++++++-----
 golang/publishing_message.go |  2 +-
 golang/rpc_client.go         | 36 ++++++++++++++++++++++++++++++++----
 7 files changed, 94 insertions(+), 22 deletions(-)

diff --git a/golang/message_id_codec.go b/golang/message_id_codec.go
index 579b5132..99b04447 100644
--- a/golang/message_id_codec.go
+++ b/golang/message_id_codec.go
@@ -27,6 +27,8 @@ import (
        "sync/atomic"
        "time"
 
+       uberatomic "go.uber.org/atomic"
+
        "github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
 )
 
@@ -83,7 +85,7 @@ var (
        processFixedStringV1    string
        secondsSinceCustomEpoch int64
        secondsStartTimestamp   int64
-       seconds                 int64
+       seconds                 uberatomic.Int64
        sequence                int32
 )
 
@@ -113,7 +115,7 @@ func init() {
        secondsSinceCustomEpoch = time.Now().Unix() - time.Date(2021, 1, 1, 0, 
0, 0, 0, time.UTC).Unix()
        // TODO Implement System.nanoTime() in golang, see 
https://github.com/golang/go/issues/16658
        secondsStartTimestamp = time.Now().Unix()
-       seconds = deltaSeconds()
+       seconds.Store(deltaSeconds())
 
        sequence = -1
 
@@ -135,8 +137,8 @@ func (mic *messageIdCodec) NextMessageId() MessageId {
        var buffer bytes.Buffer
 
        deltaSeconds := deltaSeconds()
-       if seconds != deltaSeconds {
-               seconds = deltaSeconds
+       if seconds.Load() != deltaSeconds {
+               seconds.Store(deltaSeconds)
        }
 
        if err := binary.Write(&buffer, binary.BigEndian, 
uint32(deltaSeconds)); err != nil {
diff --git a/golang/metric.go b/golang/metric.go
index 1be579fc..e7f4a156 100644
--- a/golang/metric.go
+++ b/golang/metric.go
@@ -22,6 +22,8 @@ import (
        "sync"
        "time"
 
+       "go.uber.org/atomic"
+
        "contrib.go.opencensus.io/exporter/ocagent"
        "github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
        v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -63,14 +65,14 @@ func init() {
 }
 
 type defaultClientMeter struct {
-       enabled     bool
+       enabled     atomic.Bool
        endpoints   *v2.Endpoints
        ocaExporter view.Exporter
        mutex       sync.Mutex
 }
 
 func (dcm *defaultClientMeter) shutdown() {
-       if !dcm.enabled {
+       if !dcm.enabled.Load() {
                return
        }
        dcm.mutex.Lock()
@@ -88,7 +90,7 @@ func (dcm *defaultClientMeter) shutdown() {
 }
 
 func (dcm *defaultClientMeter) start() {
-       if !dcm.enabled {
+       if !dcm.enabled.Load() {
                return
        }
        view.RegisterExporter(dcm.ocaExporter)
@@ -96,7 +98,7 @@ func (dcm *defaultClientMeter) start() {
 
 var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints 
*v2.Endpoints, clientID string) *defaultClientMeter {
        return &defaultClientMeter{
-               enabled:     on,
+               enabled:     *atomic.NewBool(on),
                endpoints:   endpoints,
                ocaExporter: exporter,
        }
@@ -163,7 +165,7 @@ func (dmmi *defaultMessageMeterInterceptor) 
doAfter(messageHookPoints MessageHoo
        return nil
 }
 func (dcmp *defaultClientMeterProvider) isEnabled() bool {
-       return dcmp.clientMeter.enabled
+       return dcmp.clientMeter.enabled.Load()
 }
 func (dcmp *defaultClientMeterProvider) getClientID() string {
        return dcmp.client.GetClientID()
@@ -172,7 +174,7 @@ func (dcmp *defaultClientMeterProvider) Reset(metric 
*v2.Metric) {
        dcmp.globalMutex.Lock()
        defer dcmp.globalMutex.Unlock()
        endpoints := metric.GetEndpoints()
-       if dcmp.clientMeter.enabled && metric.GetOn() && 
utils.CompareEndpoints(dcmp.clientMeter.endpoints, endpoints) {
+       if dcmp.clientMeter.enabled.Load() && metric.GetOn() && 
utils.CompareEndpoints(dcmp.clientMeter.endpoints, endpoints) {
                sugarBaseLogger.Infof("metric settings is satisfied by the 
current message meter, clientId=%s", dcmp.client.GetClientID())
                return
        }
diff --git a/golang/metric_test.go b/golang/metric_test.go
new file mode 100644
index 00000000..6dd0b7a3
--- /dev/null
+++ b/golang/metric_test.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+       "testing"
+
+       v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+)
+
+// This test is designed to verify there is no data race in dcmp.Reset
+func TestDefaultClientMeterProviderResetNoDataRace(t *testing.T) {
+       cli := BuildCLient(t)
+       metric := &v2.Metric{On: false, Endpoints: cli.accessPoint}
+
+       for i := 0; i < 5; i++ {
+               go func() {
+                       cli.clientMeterProvider.Reset(metric)
+               }()
+       }
+}
diff --git a/golang/producer.go b/golang/producer.go
index 54e9412f..f2fbf590 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -24,6 +24,8 @@ import (
        "sync"
        "time"
 
+       "go.uber.org/atomic"
+
        "github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
        v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
        "google.golang.org/protobuf/types/known/durationpb"
@@ -148,8 +150,8 @@ var NewProducer = func(config *Config, opts 
...ProducerOption) (Producer, error)
                        },
                },
                requestTimeout:      p.cli.opts.timeout,
-               validateMessageType: true,
-               maxBodySizeBytes:    4 * 1024 * 1024,
+               validateMessageType: *atomic.NewBool(true),
+               maxBodySizeBytes:    *atomic.NewInt32(4 * 1024 * 1024),
        }
        for _, topic := range po.topics {
                topicResource := &v2.Resource{
diff --git a/golang/producer_options.go b/golang/producer_options.go
index fd9ba29a..640f1e30 100644
--- a/golang/producer_options.go
+++ b/golang/producer_options.go
@@ -22,6 +22,8 @@ import (
        "sync"
        "time"
 
+       "go.uber.org/atomic"
+
        v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
        "google.golang.org/protobuf/types/known/durationpb"
 )
@@ -94,8 +96,8 @@ type producerSettings struct {
        clientType          v2.ClientType
        retryPolicy         *v2.RetryPolicy
        requestTimeout      time.Duration
-       validateMessageType bool
-       maxBodySizeBytes    int
+       validateMessageType atomic.Bool
+       maxBodySizeBytes    atomic.Int32
 }
 
 func (ps *producerSettings) GetClientID() string {
@@ -114,7 +116,7 @@ func (ps *producerSettings) GetRequestTimeout() 
time.Duration {
        return ps.requestTimeout
 }
 func (ps *producerSettings) IsValidateMessageType() bool {
-       return ps.validateMessageType
+       return ps.validateMessageType.Load()
 }
 
 func (ps *producerSettings) toProtobuf() *v2.Settings {
@@ -160,8 +162,8 @@ func (ps *producerSettings) applySettingsCommand(settings 
*v2.Settings) error {
                        }
                }
        }
-       ps.validateMessageType = v.Publishing.GetValidateMessageType()
-       ps.maxBodySizeBytes = int(v.Publishing.GetMaxBodySize())
+       ps.validateMessageType.Store(v.Publishing.GetValidateMessageType())
+       ps.maxBodySizeBytes.Store(v.Publishing.GetMaxBodySize())
 
        return nil
 }
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index dbf8d2fe..456202ab 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -41,7 +41,7 @@ var NewPublishingMessage = func(msg *Message, settings 
*producerSettings, txEnab
                msg: msg,
        }
 
-       maxBodySizeBytes := settings.maxBodySizeBytes
+       maxBodySizeBytes := int(settings.maxBodySizeBytes.Load())
 
        length := len(msg.Body)
        if length > maxBodySizeBytes {
diff --git a/golang/rpc_client.go b/golang/rpc_client.go
index 6a593b37..12a78e4f 100644
--- a/golang/rpc_client.go
+++ b/golang/rpc_client.go
@@ -22,7 +22,6 @@ import (
        "errors"
        "fmt"
        "sync"
-       "sync/atomic"
        "time"
 
        v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -52,7 +51,6 @@ var _ = RpcClient(&rpcClient{})
 
 type rpcClient struct {
        opts             rpcClientOptions
-       queue            atomic.Value
        mux              sync.Mutex
        conn             ClientConn
        msc              v2.MessagingServiceClient
@@ -60,6 +58,14 @@ type rpcClient struct {
        activityNanoTime time.Time
 }
 
+/*
+ * Memory safety:
+ * - target, opts, msc are never mutated after creation so it can be read 
concurrently without locking the mutex
+ * - activityNanoTime is read and written to across threads so the mutex must 
be locked before read or write operations
+ * - conn is autogenerated and designed to be accessed concurrently but 
closing the connection is not atomic (I think so I haven't been able to confirm 
it)
+ *   so let's assume to access or mutate it locking the mutex is required.
+ */
+
 var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, 
error) {
        rc := &rpcClient{
                target: target,
@@ -84,25 +90,35 @@ func (rc *rpcClient) GetTarget() string {
 }
 
 func (rc *rpcClient) idleDuration() time.Duration {
-       return time.Since(rc.activityNanoTime)
+       rc.mux.Lock()
+       duration := time.Since(rc.activityNanoTime)
+       rc.mux.Unlock()
+       return duration
 }
 
 func (rc *rpcClient) Close() {}
 
 func (rc *rpcClient) GracefulStop() error {
+       rc.mux.Lock()
        sugarBaseLogger.Warnf("close rpc client, target=%s", rc.target)
-       return rc.conn.Close()
+       closeResult := rc.conn.Close()
+       rc.mux.Lock()
+       return closeResult
 }
 
 func (rc *rpcClient) QueryRoute(ctx context.Context, request 
*v2.QueryRouteRequest) (*v2.QueryRouteResponse, error) {
+       rc.mux.Lock()
        rc.activityNanoTime = time.Now()
+       rc.mux.Unlock()
        resp, err := rc.msc.QueryRoute(ctx, request)
        sugarBaseLogger.Debugf("queryRoute request: %v, response: %v, err: %v", 
request, resp, err)
        return resp, err
 }
 
 func (rc *rpcClient) SendMessage(ctx context.Context, request 
*v2.SendMessageRequest) (*v2.SendMessageResponse, error) {
+       rc.mux.Lock()
        rc.activityNanoTime = time.Now()
+       rc.mux.Unlock()
        resp, err := rc.msc.SendMessage(ctx, request)
        sugarBaseLogger.Debugf("sendMessage request: %v, response: %v, err: 
%v", request, resp, err)
        return resp, err
@@ -113,42 +129,54 @@ func (rc *rpcClient) Telemetry(ctx context.Context) 
(v2.MessagingService_Telemet
 }
 
 func (rc *rpcClient) EndTransaction(ctx context.Context, request 
*v2.EndTransactionRequest) (*v2.EndTransactionResponse, error) {
+       rc.mux.Lock()
        rc.activityNanoTime = time.Now()
+       rc.mux.Unlock()
        resp, err := rc.msc.EndTransaction(ctx, request)
        sugarBaseLogger.Debugf("endTransaction request: %v, response: %v, err: 
%v", request, resp, err)
        return resp, err
 }
 
 func (rc *rpcClient) HeartBeat(ctx context.Context, request 
*v2.HeartbeatRequest) (*v2.HeartbeatResponse, error) {
+       rc.mux.Lock()
        rc.activityNanoTime = time.Now()
+       rc.mux.Unlock()
        resp, err := rc.msc.Heartbeat(ctx, request)
        sugarBaseLogger.Debugf("heartBeat request: %v, response: %v, err: %v", 
request, resp, err)
        return resp, err
 }
 
 func (rc *rpcClient) NotifyClientTermination(ctx context.Context, request 
*v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, 
error) {
+       rc.mux.Lock()
        rc.activityNanoTime = time.Now()
+       rc.mux.Unlock()
        resp, err := rc.msc.NotifyClientTermination(ctx, request)
        sugarBaseLogger.Debugf("notifyClientTermination request: %v, response: 
%v, err: %v", request, resp, err)
        return resp, err
 }
 
 func (rc *rpcClient) ReceiveMessage(ctx context.Context, request 
*v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) {
+       rc.mux.Lock()
        rc.activityNanoTime = time.Now()
+       rc.mux.Unlock()
        resp, err := rc.msc.ReceiveMessage(ctx, request)
        sugarBaseLogger.Debugf("receiveMessage request: %v, err: %v", request, 
err)
        return resp, err
 }
 
 func (rc *rpcClient) AckMessage(ctx context.Context, request 
*v2.AckMessageRequest) (*v2.AckMessageResponse, error) {
+       rc.mux.Lock()
        rc.activityNanoTime = time.Now()
+       rc.mux.Unlock()
        resp, err := rc.msc.AckMessage(ctx, request)
        sugarBaseLogger.Debugf("ackMessage request: %v, response: %v, err: %v", 
request, resp, err)
        return resp, err
 }
 
 func (rc *rpcClient) ChangeInvisibleDuration(ctx context.Context, request 
*v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, 
error) {
+       rc.mux.Lock()
        rc.activityNanoTime = time.Now()
+       rc.mux.Unlock()
        resp, err := rc.msc.ChangeInvisibleDuration(ctx, request)
        sugarBaseLogger.Debugf("changeInvisibleDuration request: %v, response: 
%v, err: %v", request, resp, err)
        return resp, err

Reply via email to