This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e9d3a4  change 'consumerGroup' to 'groupName' (#161)
7e9d3a4 is described below

commit 7e9d3a4e219712e24e7958b7d03b437086bccbe0
Author: guyinyou <[email protected]>
AuthorDate: Thu Aug 25 10:36:18 2022 +0800

    change 'consumerGroup' to 'groupName' (#161)
    
    * change 'consumerGroup' to 'groupName'
    
    * fix toProtobuf in publishingMessage
    
    * onPrintThreadStackTraceCommand
    
    * get the code returned by grpc & telemeter before starting client
    
    Co-authored-by: guyinyou <[email protected]>
---
 golang/client.go                                | 47 ++++++++++++++++++-------
 golang/client_manager.go                        |  5 +--
 golang/conn.go                                  |  1 +
 golang/consumer.go                              |  2 +-
 golang/{consumer.go => error.go}                | 39 +++++++++++++++++---
 golang/example/consumer/simple_consumer/main.go | 14 ++++----
 golang/example/producer/async/main.go           | 14 ++++----
 golang/example/producer/delay/main.go           | 14 ++++----
 golang/example/producer/fifo/main.go            | 14 ++++----
 golang/example/producer/normal/main.go          | 14 ++++----
 golang/example/producer/transaction/main.go     | 14 ++++----
 golang/log.go                                   | 19 +++++-----
 golang/message.go                               |  2 +-
 golang/metric.go                                |  3 +-
 golang/pkg/utils/utils.go                       |  7 ++++
 golang/producer.go                              | 17 +++++----
 golang/publishing_message.go                    |  2 +-
 golang/rpc_client.go                            |  2 +-
 golang/simple_consumer.go                       | 39 +++++++++++---------
 golang/simple_consumer_options.go               |  4 +--
 golang/transaction.go                           |  4 +--
 21 files changed, 174 insertions(+), 103 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index 6c5ee5b..48f00f9 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -87,12 +87,12 @@ func (cs *defaultClientSession) startUp() {
                        if err != nil {
                                cs.release()
 
-                               cs.cli.log.Errorf("telemetryCommand recv err = 
%v", err)
+                               cs.cli.log.Errorf("telemetryCommand recv 
err=%w", err)
                                continue
                        }
                        err = cs.handleTelemetryCommand(response)
                        if err != nil {
-                               cs.cli.log.Errorf("telemetryCommand recv err = 
%v", err)
+                               cs.cli.log.Errorf("telemetryCommand recv 
err=%w", err)
                        }
                }
        }()
@@ -275,7 +275,10 @@ func (cli *defaultClient) queryRoute(ctx context.Context, 
topic string, duration
                return nil, err
        }
        if response.GetStatus().GetCode() != v2.Code_OK {
-               return nil, fmt.Errorf("query route err = %s", 
response.String())
+               return nil, &ErrRpcStatus{
+                       Code:    int32(response.Status.GetCode()),
+                       Message: response.GetStatus().GetMessage(),
+               }
        }
 
        if len(response.GetMessageQueues()) == 0 {
@@ -337,9 +340,13 @@ func (cli *defaultClient) doHeartbeat(target string, 
request *v2.HeartbeatReques
        }
        if resp.Status.GetCode() != v2.Code_OK {
                if resp.Status.GetCode() == v2.Code_UNRECOGNIZED_CLIENT_TYPE {
-                       go cli.syncSettings()
+                       go cli.trySyncSettings()
+               }
+               cli.log.Errorf("failed to send heartbeat, code=%v, status 
message=[%s], endpoints=%v", resp.Status.GetCode(), resp.Status.GetMessage(), 
endpoints)
+               return &ErrRpcStatus{
+                       Code:    int32(resp.Status.GetCode()),
+                       Message: resp.GetStatus().GetMessage(),
                }
-               return fmt.Errorf("failed to send heartbeat, code=%v, status 
message=[%s], endpoints=%v", resp.Status.GetCode(), resp.Status.GetMessage(), 
endpoints)
        }
        cli.log.Infof("send heartbeat successfully, endpoints=%v", endpoints)
        switch p := cli.clientImpl.(type) {
@@ -364,7 +371,7 @@ func (cli *defaultClient) Heartbeat() {
        }
 }
 
-func (cli *defaultClient) syncSettings() {
+func (cli *defaultClient) trySyncSettings() {
        cli.log.Info("start syncSetting")
        command := cli.getSettingsCommand()
        targets := cli.getTotalTargets()
@@ -373,25 +380,37 @@ func (cli *defaultClient) syncSettings() {
        }
 }
 
-func (cli *defaultClient) telemeter(target string, command 
*v2.TelemetryCommand) {
+func (cli *defaultClient) mustSyncSettings() error {
+       cli.log.Info("start syncSetting")
+       command := cli.getSettingsCommand()
+       targets := cli.getTotalTargets()
+       for _, target := range targets {
+               if err := cli.telemeter(target, command); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (cli *defaultClient) telemeter(target string, command 
*v2.TelemetryCommand) error {
        cs, err := cli.getDefaultClientSession(target)
        if err != nil {
                cli.log.Error("getDefaultClientSession failed, err=%v", target, 
err)
-               return
+               return err
        }
        ctx := cli.Sign(context.Background())
        err = cs.publish(ctx, command)
        if err != nil {
                cli.log.Error("telemeter to %s failed, err=%v", target, err)
-               return
+               return err
        }
        cli.log.Infof("telemeter to %s success", target)
+       return nil
 }
 
 func (cli *defaultClient) startUp() error {
        cli.log.Infof("begin to start the rocketmq client")
        cli.clientManager = defaultClientManagerRegistry.RegisterClient(cli)
-
        for _, topic := range cli.initTopics {
                maxAttempts := 
int(cli.settings.GetRetryPolicy().GetMaxAttempts())
                for i := 0; i < maxAttempts; i++ {
@@ -411,7 +430,9 @@ func (cli *defaultClient) startUp() error {
                        }
                }
        }
-
+       if err := cli.mustSyncSettings(); err != nil {
+               return err
+       }
        f := func() {
                cli.router.Range(func(k, v interface{}) bool {
                        topic := k.(string)
@@ -512,7 +533,7 @@ func (cli *defaultClient) onSettingsCommand(endpoints 
*v2.Endpoints, settings *v
 func (cli *defaultClient) onRecoverOrphanedTransactionCommand(endpoints 
*v2.Endpoints, command *v2.RecoverOrphanedTransactionCommand) {
        if p, ok := cli.clientImpl.(*defaultProducer); ok {
                if err := p.onRecoverOrphanedTransactionCommand(endpoints, 
command); err != nil {
-                       cli.log.Errorf("onRecoverOrphanedTransactionCommand err 
= %v", err)
+                       cli.log.Errorf("onRecoverOrphanedTransactionCommand 
err=%w", err)
                }
        } else {
                cli.log.Infof("ignore orphaned transaction recovery command 
from remote, which is not expected, command=%v", command)
@@ -543,7 +564,7 @@ func (cli *defaultClient) 
onPrintThreadStackTraceCommand(endpoints *v2.Endpoints
        nonce := command.GetNonce()
        go func(nonce string) {
                // TODO get stack
-               stackTrace := ""
+               stackTrace := utils.DumpStacks()
                status := &v2.Status{
                        Code: v2.Code_OK,
                }
diff --git a/golang/client_manager.go b/golang/client_manager.go
index c2f1399..5046686 100644
--- a/golang/client_manager.go
+++ b/golang/client_manager.go
@@ -29,6 +29,7 @@ import (
        "google.golang.org/grpc/status"
 )
 
+// TODO: no global clientManager
 type ClientManager interface {
        RegisterClient(client Client)
        UnRegisterClient(client Client)
@@ -211,7 +212,7 @@ func (cm *defaultClientManager) syncSettings() {
        sugarBaseLogger.Info("clientManager start syncSettings")
        cm.clientTable.Range(func(_, v interface{}) bool {
                client := v.(*defaultClient)
-               client.syncSettings()
+               client.trySyncSettings()
                return true
        })
 }
@@ -263,7 +264,7 @@ func (cm *defaultClientManager) handleGrpcError(rpcClient 
RpcClient, err error)
        if err != nil {
                if e, ok := status.FromError(err); ok {
                        if e.Code() == codes.Unavailable {
-                               sugarBaseLogger.Errorf("happened unavailable 
error=%v, close rpcClient=%s", err, rpcClient.GetTarget())
+                               sugarBaseLogger.Errorf("happened unavailable 
err=%w, close rpcClient=%s", err, rpcClient.GetTarget())
                                cm.rpcClientTableLock.Lock()
                                defer cm.rpcClientTableLock.Unlock()
                                cm.deleteRpcClient(rpcClient)
diff --git a/golang/conn.go b/golang/conn.go
index f5bf1a7..eeca8a0 100644
--- a/golang/conn.go
+++ b/golang/conn.go
@@ -118,6 +118,7 @@ func (c *clientConn) dialSetupOpts(dopts 
...grpc.DialOption) (opts []grpc.DialOp
        if c.creds != nil {
                opts = append(opts, grpc.WithTransportCredentials(c.creds))
        }
+       // TODO get requestID in header
        opts = append(opts, grpc.WithBlock(), grpc.WithChainUnaryInterceptor(
                zaplog.UnaryClientInterceptor(c.opts.Logger),
        ))
diff --git a/golang/consumer.go b/golang/consumer.go
index 47ed593..b2d9596 100644
--- a/golang/consumer.go
+++ b/golang/consumer.go
@@ -24,6 +24,6 @@ import (
 )
 
 type Consumer interface {
-       GetConsumerGroup() string
+       GetGroupName() string
        wrapReceiveMessageRequest(batchSize int, messageQueue *v2.MessageQueue, 
filterExpression *FilterExpression, invisibleDuration time.Duration) 
*v2.ReceiveMessageRequest
 }
diff --git a/golang/consumer.go b/golang/error.go
similarity index 57%
copy from golang/consumer.go
copy to golang/error.go
index 47ed593..1105ea6 100644
--- a/golang/consumer.go
+++ b/golang/error.go
@@ -18,12 +18,43 @@
 package golang
 
 import (
-       "time"
+       "errors"
+       "fmt"
 
        v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
 )
 
-type Consumer interface {
-       GetConsumerGroup() string
-       wrapReceiveMessageRequest(batchSize int, messageQueue *v2.MessageQueue, 
filterExpression *FilterExpression, invisibleDuration time.Duration) 
*v2.ReceiveMessageRequest
+type ErrRpcStatus struct {
+       Code    int32
+       Message string
+}
+
+func (err *ErrRpcStatus) GetCode() int32 {
+       return err.GetCode()
+}
+
+func (err *ErrRpcStatus) GetMessage() string {
+       return err.Message
+}
+
+func (err *ErrRpcStatus) Error() string {
+       codeName, ok := v2.Code_name[err.Code]
+       if !ok {
+               codeName = string(err.Code)
+       }
+       return fmt.Sprintf("CODE: %s, MESSAGE: %s", codeName, err.Message)
+}
+
+var _ = error(&ErrRpcStatus{})
+
+func AsErrRpcStatus(err error) (*ErrRpcStatus, bool) {
+       if err == nil {
+               return nil, false
+       }
+       target, ok := err.(*ErrRpcStatus)
+       if ok {
+               return target, true
+       }
+       err = errors.Unwrap(err)
+       return AsErrRpcStatus(err)
 }
diff --git a/golang/example/consumer/simple_consumer/main.go 
b/golang/example/consumer/simple_consumer/main.go
index aee2d91..c4139b2 100644
--- a/golang/example/consumer/simple_consumer/main.go
+++ b/golang/example/consumer/simple_consumer/main.go
@@ -29,12 +29,12 @@ import (
 )
 
 const (
-       Topic         = "xxxxxx"
-       ConsumerGroup = "xxxxxx"
-       Endpoint      = "xxxxxx"
-       Region        = "xxxxxx"
-       AccessKey     = "xxxxxx"
-       SecretKey     = "xxxxxx"
+       Topic     = "xxxxxx"
+       GroupName = "xxxxxx"
+       Endpoint  = "xxxxxx"
+       Region    = "xxxxxx"
+       AccessKey = "xxxxxx"
+       SecretKey = "xxxxxx"
 )
 
 var (
@@ -54,7 +54,7 @@ func main() {
        // new simpleConsumer instance
        simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
                Endpoint: Endpoint,
-               Group:    ConsumerGroup,
+               Group:    GroupName,
                Region:   Region,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
diff --git a/golang/example/producer/async/main.go 
b/golang/example/producer/async/main.go
index 988987a..6c1e1df 100644
--- a/golang/example/producer/async/main.go
+++ b/golang/example/producer/async/main.go
@@ -30,12 +30,12 @@ import (
 )
 
 const (
-       Topic         = "xxxxxx"
-       ConsumerGroup = "xxxxxx"
-       Endpoint      = "xxxxxx"
-       Region        = "xxxxxx"
-       AccessKey     = "xxxxxx"
-       SecretKey     = "xxxxxx"
+       Topic     = "xxxxxx"
+       GroupName = "xxxxxx"
+       Endpoint  = "xxxxxx"
+       Region    = "xxxxxx"
+       AccessKey = "xxxxxx"
+       SecretKey = "xxxxxx"
 )
 
 func main() {
@@ -45,7 +45,7 @@ func main() {
        // new producer instance
        producer, err := golang.NewProducer(&golang.Config{
                Endpoint: Endpoint,
-               Group:    ConsumerGroup,
+               Group:    GroupName,
                Region:   Region,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
diff --git a/golang/example/producer/delay/main.go 
b/golang/example/producer/delay/main.go
index 23f0aa0..695e107 100644
--- a/golang/example/producer/delay/main.go
+++ b/golang/example/producer/delay/main.go
@@ -30,12 +30,12 @@ import (
 )
 
 const (
-       Topic         = "xxxxxx"
-       ConsumerGroup = "xxxxxx"
-       Endpoint      = "xxxxxx"
-       Region        = "xxxxxx"
-       AccessKey     = "xxxxxx"
-       SecretKey     = "xxxxxx"
+       Topic     = "xxxxxx"
+       GroupName = "xxxxxx"
+       Endpoint  = "xxxxxx"
+       Region    = "xxxxxx"
+       AccessKey = "xxxxxx"
+       SecretKey = "xxxxxx"
 )
 
 func main() {
@@ -45,7 +45,7 @@ func main() {
        // new producer instance
        producer, err := golang.NewProducer(&golang.Config{
                Endpoint: Endpoint,
-               Group:    ConsumerGroup,
+               Group:    GroupName,
                Region:   Region,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
diff --git a/golang/example/producer/fifo/main.go 
b/golang/example/producer/fifo/main.go
index 40080c3..c2250d3 100644
--- a/golang/example/producer/fifo/main.go
+++ b/golang/example/producer/fifo/main.go
@@ -30,12 +30,12 @@ import (
 )
 
 const (
-       Topic         = "xxxxxx"
-       ConsumerGroup = "xxxxxx"
-       Endpoint      = "xxxxxx"
-       Region        = "xxxxxx"
-       AccessKey     = "xxxxxx"
-       SecretKey     = "xxxxxx"
+       Topic     = "xxxxxx"
+       GroupName = "xxxxxx"
+       Endpoint  = "xxxxxx"
+       Region    = "xxxxxx"
+       AccessKey = "xxxxxx"
+       SecretKey = "xxxxxx"
 )
 
 func main() {
@@ -45,7 +45,7 @@ func main() {
        // new producer instance
        producer, err := golang.NewProducer(&golang.Config{
                Endpoint: Endpoint,
-               Group:    ConsumerGroup,
+               Group:    GroupName,
                Region:   Region,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
diff --git a/golang/example/producer/normal/main.go 
b/golang/example/producer/normal/main.go
index 9285c71..5be7497 100644
--- a/golang/example/producer/normal/main.go
+++ b/golang/example/producer/normal/main.go
@@ -30,12 +30,12 @@ import (
 )
 
 const (
-       Topic         = "xxxxxx"
-       ConsumerGroup = "xxxxxx"
-       Endpoint      = "xxxxxx"
-       Region        = "xxxxxx"
-       AccessKey     = "xxxxxx"
-       SecretKey     = "xxxxxx"
+       Topic     = "xxxxxx"
+       GroupName = "xxxxxx"
+       Endpoint  = "xxxxxx"
+       Region    = "xxxxxx"
+       AccessKey = "xxxxxx"
+       SecretKey = "xxxxxx"
 )
 
 func main() {
@@ -44,7 +44,7 @@ func main() {
        // new producer instance
        producer, err := golang.NewProducer(&golang.Config{
                Endpoint: Endpoint,
-               Group:    ConsumerGroup,
+               Group:    GroupName,
                Region:   Region,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
diff --git a/golang/example/producer/transaction/main.go 
b/golang/example/producer/transaction/main.go
index 0a5e0e9..ac38d38 100644
--- a/golang/example/producer/transaction/main.go
+++ b/golang/example/producer/transaction/main.go
@@ -30,12 +30,12 @@ import (
 )
 
 const (
-       Topic         = "xxxxxx"
-       ConsumerGroup = "xxxxxx"
-       Endpoint      = "xxxxxx"
-       Region        = "xxxxxx"
-       AccessKey     = "xxxxxx"
-       SecretKey     = "xxxxxx"
+       Topic     = "xxxxxx"
+       GroupName = "xxxxxx"
+       Endpoint  = "xxxxxx"
+       Region    = "xxxxxx"
+       AccessKey = "xxxxxx"
+       SecretKey = "xxxxxx"
 )
 
 func main() {
@@ -45,7 +45,7 @@ func main() {
        // new producer instance
        producer, err := golang.NewProducer(&golang.Config{
                Endpoint: Endpoint,
-               Group:    ConsumerGroup,
+               Group:    GroupName,
                Region:   Region,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
diff --git a/golang/log.go b/golang/log.go
index 11af800..31d3e7f 100644
--- a/golang/log.go
+++ b/golang/log.go
@@ -30,13 +30,14 @@ import (
 )
 
 const (
-       CLIENT_LOG_ROOT            = "rocketmq.client.logRoot"
-       CLIENT_LOG_MAXINDEX        = "rocketmq.client.logFileMaxIndex"
-       CLIENT_LOG_FILESIZE        = "rocketmq.client.logFileMaxSize"
-       CLIENT_LOG_LEVEL           = "rocketmq.client.logLevel"
-       CLIENT_LOG_ADDITIVE        = "rocketmq.client.log.additive"
-       CLIENT_LOG_FILENAME        = "rocketmq.client.logFileName"
-       CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize"
+       CLIENT_LOG_ROOT     = "rocketmq.client.logRoot"
+       CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex"
+       CLIENT_LOG_FILESIZE = "rocketmq.client.logFileMaxSize"
+       CLIENT_LOG_LEVEL    = "rocketmq.client.logLevel"
+       // CLIENT_LOG_ADDITIVE        = "rocketmq.client.log.additive"
+       CLIENT_LOG_FILENAME = "rocketmq.client.logFileName"
+       // CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize"
+       ENABLE_CONSOLE_APPENDER = "mq.consoleAppender.enabled"
 )
 
 var sugarBaseLogger *zap.SugaredLogger
@@ -47,14 +48,14 @@ func ResetLogger() {
 
 func InitLogger() {
        writeSyncer := getLogWriter()
-       isStdOut := utils.GetenvWithDef("mq.consoleAppender.enabled", "false")
+       isStdOut := utils.GetenvWithDef(ENABLE_CONSOLE_APPENDER, "false")
        if isStdOut == "true" {
                writeSyncer = os.Stdout
        }
        encoder := getEncoder()
 
        var atomicLevel = zap.NewAtomicLevel()
-       switch strings.ToLower(os.Getenv("rocketmq.log.level")) {
+       switch strings.ToLower(os.Getenv(CLIENT_LOG_LEVEL)) {
        case "debug":
                atomicLevel.SetLevel(zap.DebugLevel)
        case "warn":
diff --git a/golang/message.go b/golang/message.go
index a7441ed..4c16ba6 100644
--- a/golang/message.go
+++ b/golang/message.go
@@ -222,7 +222,7 @@ func fromProtobuf_MessageView2(message *v2.Message, 
messageQueue *v2.MessageQueu
        case v2.Encoding_GZIP:
                unCompressBody, err := utils.GZIPDecode(message.GetBody())
                if err != nil {
-                       sugarBaseLogger.Errorf("failed to uncompress message 
body, topic=%s, messageId=%s, err=%v", mv.topic, mv.messageId, err)
+                       sugarBaseLogger.Errorf("failed to uncompress message 
body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err)
                        corrupted = true
                } else {
                        mv.body = unCompressBody
diff --git a/golang/metric.go b/golang/metric.go
index 5339688..464b91a 100644
--- a/golang/metric.go
+++ b/golang/metric.go
@@ -43,7 +43,6 @@ var (
        topicTag, _            = tag.NewKey("topic")
        clientIdTag, _         = tag.NewKey("client_id")
        invocationStatusTag, _ = tag.NewKey("invocation_status")
-       consumerGroupTag, _    = tag.NewKey("consumer_group")
 
        MLatencyMs = stats.Int64("publish_latency", "Publish latency in 
milliseconds", "ms")
 
@@ -82,7 +81,7 @@ func (dcm *defaultClientMeter) shutdown() {
                if ok {
                        err := oce.Stop()
                        if err != nil {
-                               sugarBaseLogger.Errorf("ocExporter stop failed, 
err = %v", err)
+                               sugarBaseLogger.Errorf("ocExporter stop failed, 
err=%w", err)
                        }
                }
        }
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index 4fb6008..d7ce79a 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -25,6 +25,7 @@ import (
        "io/ioutil"
        "net/url"
        "os"
+       "runtime"
        "strconv"
        "strings"
        "sync/atomic"
@@ -240,3 +241,9 @@ func GetenvWithDef(key, def string) string {
        }
        return val
 }
+
+func DumpStacks() string {
+       buf := make([]byte, 16384)
+       buf = buf[:runtime.Stack(buf, true)]
+       return string(buf)
+}
diff --git a/golang/producer.go b/golang/producer.go
index a29091c..ee7281e 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -19,7 +19,6 @@ package golang
 
 import (
        "context"
-       "errors"
        "fmt"
        "math"
        "sync"
@@ -57,9 +56,9 @@ func (p *defaultProducer) Start() error {
        }
        err2 := p.GracefulStop()
        if err2 != nil {
-               return fmt.Errorf("startUp err = %v, shutdown err = %v", err, 
err2)
+               return fmt.Errorf("startUp err=%w, shutdown err=%w", err, err2)
        }
-       return fmt.Errorf("startUp err = %v", err)
+       return fmt.Errorf("startUp err=%w", err)
 }
 
 var _ = Producer(&defaultProducer{})
@@ -208,7 +207,10 @@ func (p *defaultProducer) send1(ctx context.Context, topic 
string, messageType v
        tooManyRequests := false
        if err == nil && resp.GetStatus().GetCode() != v2.Code_OK {
                tooManyRequests = resp.GetStatus().GetCode() == 
v2.Code_TOO_MANY_REQUESTS
-               err = errors.New(resp.String())
+               err = &ErrRpcStatus{
+                       Code:    int32(resp.Status.GetCode()),
+                       Message: resp.GetStatus().GetMessage(),
+               }
        }
        if err != nil {
                messageHookPointsStatus = MessageHookPointsStatus_ERROR
@@ -414,7 +416,10 @@ func (p *defaultProducer) endTransaction(ctx 
context.Context, endpoints *v2.Endp
        duration := time.Since(watchTime)
        messageHookPointsStatus := MessageHookPointsStatus_OK
        if err == nil && resp.GetStatus().GetCode() != v2.Code_OK {
-               err = errors.New(resp.String())
+               err = &ErrRpcStatus{
+                       Code:    int32(resp.Status.GetCode()),
+                       Message: resp.GetStatus().GetMessage(),
+               }
        }
        if err != nil {
                messageHookPointsStatus = MessageHookPointsStatus_ERROR
@@ -435,7 +440,7 @@ func (p *defaultProducer) 
onRecoverOrphanedTransactionCommand(endpoints *v2.Endp
                err := p.endTransaction(context.TODO(), endpoints,
                        mv.GetMessageCommon(), messageId, transactionId, 
resolution)
                if err != nil {
-                       p.cli.log.Errorf("exception raised while ending the 
transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%v\n", 
messageId, transactionId, endpoints, err)
+                       p.cli.log.Errorf("exception raised while ending the 
transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%w\n", 
messageId, transactionId, endpoints, err)
                }
        }(messageView)
        return nil
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index 27616fe..3621090 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -90,7 +90,7 @@ func (pMsg *PublishingMessage) toProtobuf() (*v2.Message, 
error) {
                },
                SystemProperties: &v2.SystemProperties{
                        Keys:          pMsg.msg.GetKeys(),
-                       MessageId:     uuid.New().String(),
+                       MessageId:     pMsg.messageId,
                        BornTimestamp: timestamppb.Now(),
                        BornHost:      innerOS.Hostname(),
                        BodyEncoding:  v2.Encoding_IDENTITY,
diff --git a/golang/rpc_client.go b/golang/rpc_client.go
index 0b3ee74..fe64dc9 100644
--- a/golang/rpc_client.go
+++ b/golang/rpc_client.go
@@ -70,7 +70,7 @@ var NewRpcClient = func(target string, opts 
...RpcClientOption) (RpcClient, erro
        }
        conn, err := rc.opts.clientConnFunc(target, rc.opts.connOptions...)
        if err != nil {
-               return nil, fmt.Errorf("create grpc conn failed, err=%v", err)
+               return nil, fmt.Errorf("create grpc conn failed, err=%w", err)
        }
        rc.conn = conn
        rc.msc = v2.NewMessagingServiceClient(conn.Conn())
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 3a14959..984b368 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -19,7 +19,6 @@ package golang
 
 import (
        "context"
-       "errors"
        "fmt"
        "io"
        "sync"
@@ -51,7 +50,7 @@ var _ = SimpleConsumer(&defaultSimpleConsumer{})
 type defaultSimpleConsumer struct {
        cli *defaultClient
 
-       consumerGroup                string
+       groupName                    string
        topicIndex                   int32
        scOpts                       simpleConsumerOptions
        scSettings                   *simpleConsumerSettings
@@ -79,7 +78,7 @@ func (sc *defaultSimpleConsumer) 
changeInvisibleDuration0(messageView *MessageVi
                        Name: messageView.GetTopic(),
                },
                Group: &v2.Resource{
-                       Name: sc.consumerGroup,
+                       Name: sc.groupName,
                },
                ReceiptHandle:     messageView.GetReceiptHandle(),
                InvisibleDuration: durationpb.New(invisibleDuration),
@@ -93,7 +92,10 @@ func (sc *defaultSimpleConsumer) 
changeInvisibleDuration0(messageView *MessageVi
                sc.cli.log.Errorf("exception raised during message 
acknowledgement, messageId=%s, endpoints=%v", messageView.GetMessageId(), 
endpoints)
        } else if resp.GetStatus().GetCode() != v2.Code_OK {
                sc.cli.log.Errorf("failed to change message invisible duration, 
messageId=%s, endpoints=%v, code=%v, status message=[%s]", 
messageView.GetMessageId(), endpoints, resp.GetStatus().GetCode(), 
resp.GetStatus().GetMessage())
-               err = errors.New(resp.String())
+               err = &ErrRpcStatus{
+                       Code:    int32(resp.Status.GetCode()),
+                       Message: resp.GetStatus().GetMessage(),
+               }
        }
        if err != nil {
                messageHookPointsStatus = MessageHookPointsStatus_ERROR
@@ -151,7 +153,7 @@ func (sc *defaultSimpleConsumer) Unsubscribe(topic string) 
error {
 func (sc *defaultSimpleConsumer) wrapReceiveMessageRequest(batchSize int, 
messageQueue *v2.MessageQueue, filterExpression *FilterExpression, 
invisibleDuration time.Duration) *v2.ReceiveMessageRequest {
        return &v2.ReceiveMessageRequest{
                Group: &v2.Resource{
-                       Name: sc.consumerGroup,
+                       Name: sc.groupName,
                },
                MessageQueue: messageQueue,
                FilterExpression: &v2.FilterExpression{
@@ -165,7 +167,7 @@ func (sc *defaultSimpleConsumer) 
wrapReceiveMessageRequest(batchSize int, messag
 
 func (sc *defaultSimpleConsumer) wrapAckMessageRequest(messageView 
*MessageView) *v2.AckMessageRequest {
        return &v2.AckMessageRequest{
-               Group: sc.scSettings.consumerGroup,
+               Group: sc.scSettings.groupName,
                Topic: &v2.Resource{
                        Name: messageView.GetTopic(),
                },
@@ -178,8 +180,8 @@ func (sc *defaultSimpleConsumer) 
wrapAckMessageRequest(messageView *MessageView)
        }
 }
 
-func (sc *defaultSimpleConsumer) GetConsumerGroup() string {
-       return sc.consumerGroup
+func (sc *defaultSimpleConsumer) GetGroupName() string {
+       return sc.groupName
 }
 
 func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request 
*v2.ReceiveMessageRequest, messageQueue *v2.MessageQueue, timeout 
time.Duration) ([]*MessageView, error) {
@@ -205,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx 
context.Context, request *v2
                                break
                        }
                        if err != nil {
-                               sc.cli.log.Errorf("simpleConsumer recv msg 
err=%v", err)
+                               sc.cli.log.Errorf("simpleConsumer recv msg 
err=%w", err)
                                break
                        }
                        resps = append(resps, resp)
@@ -246,7 +248,10 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx 
context.Context, request *v2
                if status.GetCode() == v2.Code_OK {
                        return messageViewList, nil
                } else {
-                       return nil, fmt.Errorf("[error] code=%d, message=%s", 
status.GetCode().Number(), status.GetMessage())
+                       return nil, &ErrRpcStatus{
+                               Code:    int32(status.GetCode()),
+                               Message: status.GetMessage(),
+                       }
                }
        }
 }
@@ -305,7 +310,7 @@ func (sc *defaultSimpleConsumer) 
onVerifyMessageCommand(endpoints *v2.Endpoints,
 
 func (sc *defaultSimpleConsumer) wrapHeartbeatRequest() *v2.HeartbeatRequest {
        return &v2.HeartbeatRequest{
-               Group:      sc.scSettings.consumerGroup,
+               Group:      sc.scSettings.groupName,
                ClientType: v2.ClientType_SIMPLE_CONSUMER,
        }
 }
@@ -320,9 +325,9 @@ var NewSimpleConsumer = func(config *Config, opts 
...SimpleConsumerOption) (Simp
                return nil, err
        }
        sc := &defaultSimpleConsumer{
-               scOpts:        *scOpts,
-               cli:           cli.(*defaultClient),
-               consumerGroup: config.Group,
+               scOpts:    *scOpts,
+               cli:       cli.(*defaultClient),
+               groupName: config.Group,
 
                awaitDuration:           scOpts.awaitDuration,
                subscriptionExpressions: scOpts.subscriptionExpressions,
@@ -341,8 +346,8 @@ var NewSimpleConsumer = func(config *Config, opts 
...SimpleConsumerOption) (Simp
                clientType:     v2.ClientType_SIMPLE_CONSUMER,
                requestTimeout: sc.cli.opts.timeout,
 
-               consumerGroup: &v2.Resource{
-                       Name: sc.consumerGroup,
+               groupName: &v2.Resource{
+                       Name: sc.groupName,
                },
                longPollingTimeout:      scOpts.awaitDuration,
                subscriptionExpressions: scOpts.subscriptionExpressions,
@@ -359,7 +364,7 @@ func (sc *defaultSimpleConsumer) Start() error {
        }
        err2 := sc.GracefulStop()
        if err2 != nil {
-               return fmt.Errorf("startUp err = %v, shutdown err = %v", err, 
err2)
+               return fmt.Errorf("startUp err=%w, shutdown err=%w", err, err2)
        }
        return err
 }
diff --git a/golang/simple_consumer_options.go 
b/golang/simple_consumer_options.go
index 3df80b8..2f919e9 100644
--- a/golang/simple_consumer_options.go
+++ b/golang/simple_consumer_options.go
@@ -108,7 +108,7 @@ type simpleConsumerSettings struct {
        retryPolicy    *v2.RetryPolicy
        requestTimeout time.Duration
 
-       consumerGroup           *v2.Resource
+       groupName               *v2.Resource
        longPollingTimeout      time.Duration
        subscriptionExpressions map[string]*FilterExpression
 }
@@ -177,7 +177,7 @@ func (sc *simpleConsumerSettings) toProtobuf() *v2.Settings 
{
        }
        subSetting := &v2.Settings_Subscription{
                Subscription: &v2.Subscription{
-                       Group:              sc.consumerGroup,
+                       Group:              sc.groupName,
                        Subscriptions:      subscriptions,
                        LongPollingTimeout: 
durationpb.New(sc.longPollingTimeout),
                },
diff --git a/golang/transaction.go b/golang/transaction.go
index 47343ce..51b434c 100644
--- a/golang/transaction.go
+++ b/golang/transaction.go
@@ -67,7 +67,7 @@ func (t *transactionImpl) Commit() error {
                err := 
t.producerImpl.(*defaultProducer).endTransaction(context.TODO(), 
sendReceipt.Endpoints,
                        pubMessage.msg.GetMessageCommon(), 
sendReceipt.MessageID, sendReceipt.TransactionId, COMMIT)
                if err != nil {
-                       sugarBaseLogger.Errorf("transaction message commit 
failed, err=%v", err)
+                       sugarBaseLogger.Errorf("transaction message commit 
failed, err=%w", err)
                }
                return true
        })
@@ -81,7 +81,7 @@ func (t *transactionImpl) RollBack() error {
                err := 
t.producerImpl.(*defaultProducer).endTransaction(context.TODO(), 
sendReceipt.Endpoints,
                        pubMessage.msg.GetMessageCommon(), 
sendReceipt.MessageID, sendReceipt.TransactionId, ROLLBACK)
                if err != nil {
-                       sugarBaseLogger.Errorf("transaction message rollback 
failed, err=%v", err)
+                       sugarBaseLogger.Errorf("transaction message rollback 
failed, err=%w", err)
                }
                return true
        })

Reply via email to