This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 7e9d3a4 change 'consumerGroup' to 'groupName' (#161)
7e9d3a4 is described below
commit 7e9d3a4e219712e24e7958b7d03b437086bccbe0
Author: guyinyou <[email protected]>
AuthorDate: Thu Aug 25 10:36:18 2022 +0800
change 'consumerGroup' to 'groupName' (#161)
* change 'consumerGroup' to 'groupName'
* fix toProtobuf in publishingMessage
* onPrintThreadStackTraceCommand
* get the code returned by grpc & telemeter before starting client
Co-authored-by: guyinyou <[email protected]>
---
golang/client.go | 47 ++++++++++++++++++-------
golang/client_manager.go | 5 +--
golang/conn.go | 1 +
golang/consumer.go | 2 +-
golang/{consumer.go => error.go} | 39 +++++++++++++++++---
golang/example/consumer/simple_consumer/main.go | 14 ++++----
golang/example/producer/async/main.go | 14 ++++----
golang/example/producer/delay/main.go | 14 ++++----
golang/example/producer/fifo/main.go | 14 ++++----
golang/example/producer/normal/main.go | 14 ++++----
golang/example/producer/transaction/main.go | 14 ++++----
golang/log.go | 19 +++++-----
golang/message.go | 2 +-
golang/metric.go | 3 +-
golang/pkg/utils/utils.go | 7 ++++
golang/producer.go | 17 +++++----
golang/publishing_message.go | 2 +-
golang/rpc_client.go | 2 +-
golang/simple_consumer.go | 39 +++++++++++---------
golang/simple_consumer_options.go | 4 +--
golang/transaction.go | 4 +--
21 files changed, 174 insertions(+), 103 deletions(-)
diff --git a/golang/client.go b/golang/client.go
index 6c5ee5b..48f00f9 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -87,12 +87,12 @@ func (cs *defaultClientSession) startUp() {
if err != nil {
cs.release()
- cs.cli.log.Errorf("telemetryCommand recv err =
%v", err)
+ cs.cli.log.Errorf("telemetryCommand recv
err=%w", err)
continue
}
err = cs.handleTelemetryCommand(response)
if err != nil {
- cs.cli.log.Errorf("telemetryCommand recv err =
%v", err)
+ cs.cli.log.Errorf("telemetryCommand recv
err=%w", err)
}
}
}()
@@ -275,7 +275,10 @@ func (cli *defaultClient) queryRoute(ctx context.Context,
topic string, duration
return nil, err
}
if response.GetStatus().GetCode() != v2.Code_OK {
- return nil, fmt.Errorf("query route err = %s",
response.String())
+ return nil, &ErrRpcStatus{
+ Code: int32(response.Status.GetCode()),
+ Message: response.GetStatus().GetMessage(),
+ }
}
if len(response.GetMessageQueues()) == 0 {
@@ -337,9 +340,13 @@ func (cli *defaultClient) doHeartbeat(target string,
request *v2.HeartbeatReques
}
if resp.Status.GetCode() != v2.Code_OK {
if resp.Status.GetCode() == v2.Code_UNRECOGNIZED_CLIENT_TYPE {
- go cli.syncSettings()
+ go cli.trySyncSettings()
+ }
+ cli.log.Errorf("failed to send heartbeat, code=%v, status
message=[%s], endpoints=%v", resp.Status.GetCode(), resp.Status.GetMessage(),
endpoints)
+ return &ErrRpcStatus{
+ Code: int32(resp.Status.GetCode()),
+ Message: resp.GetStatus().GetMessage(),
}
- return fmt.Errorf("failed to send heartbeat, code=%v, status
message=[%s], endpoints=%v", resp.Status.GetCode(), resp.Status.GetMessage(),
endpoints)
}
cli.log.Infof("send heartbeat successfully, endpoints=%v", endpoints)
switch p := cli.clientImpl.(type) {
@@ -364,7 +371,7 @@ func (cli *defaultClient) Heartbeat() {
}
}
-func (cli *defaultClient) syncSettings() {
+func (cli *defaultClient) trySyncSettings() {
cli.log.Info("start syncSetting")
command := cli.getSettingsCommand()
targets := cli.getTotalTargets()
@@ -373,25 +380,37 @@ func (cli *defaultClient) syncSettings() {
}
}
-func (cli *defaultClient) telemeter(target string, command
*v2.TelemetryCommand) {
+func (cli *defaultClient) mustSyncSettings() error {
+ cli.log.Info("start syncSetting")
+ command := cli.getSettingsCommand()
+ targets := cli.getTotalTargets()
+ for _, target := range targets {
+ if err := cli.telemeter(target, command); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (cli *defaultClient) telemeter(target string, command
*v2.TelemetryCommand) error {
cs, err := cli.getDefaultClientSession(target)
if err != nil {
cli.log.Error("getDefaultClientSession failed, err=%v", target,
err)
- return
+ return err
}
ctx := cli.Sign(context.Background())
err = cs.publish(ctx, command)
if err != nil {
cli.log.Error("telemeter to %s failed, err=%v", target, err)
- return
+ return err
}
cli.log.Infof("telemeter to %s success", target)
+ return nil
}
func (cli *defaultClient) startUp() error {
cli.log.Infof("begin to start the rocketmq client")
cli.clientManager = defaultClientManagerRegistry.RegisterClient(cli)
-
for _, topic := range cli.initTopics {
maxAttempts :=
int(cli.settings.GetRetryPolicy().GetMaxAttempts())
for i := 0; i < maxAttempts; i++ {
@@ -411,7 +430,9 @@ func (cli *defaultClient) startUp() error {
}
}
}
-
+ if err := cli.mustSyncSettings(); err != nil {
+ return err
+ }
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)
@@ -512,7 +533,7 @@ func (cli *defaultClient) onSettingsCommand(endpoints
*v2.Endpoints, settings *v
func (cli *defaultClient) onRecoverOrphanedTransactionCommand(endpoints
*v2.Endpoints, command *v2.RecoverOrphanedTransactionCommand) {
if p, ok := cli.clientImpl.(*defaultProducer); ok {
if err := p.onRecoverOrphanedTransactionCommand(endpoints,
command); err != nil {
- cli.log.Errorf("onRecoverOrphanedTransactionCommand err
= %v", err)
+ cli.log.Errorf("onRecoverOrphanedTransactionCommand
err=%w", err)
}
} else {
cli.log.Infof("ignore orphaned transaction recovery command
from remote, which is not expected, command=%v", command)
@@ -543,7 +564,7 @@ func (cli *defaultClient)
onPrintThreadStackTraceCommand(endpoints *v2.Endpoints
nonce := command.GetNonce()
go func(nonce string) {
// TODO get stack
- stackTrace := ""
+ stackTrace := utils.DumpStacks()
status := &v2.Status{
Code: v2.Code_OK,
}
diff --git a/golang/client_manager.go b/golang/client_manager.go
index c2f1399..5046686 100644
--- a/golang/client_manager.go
+++ b/golang/client_manager.go
@@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/status"
)
+// TODO: no global clientManager
type ClientManager interface {
RegisterClient(client Client)
UnRegisterClient(client Client)
@@ -211,7 +212,7 @@ func (cm *defaultClientManager) syncSettings() {
sugarBaseLogger.Info("clientManager start syncSettings")
cm.clientTable.Range(func(_, v interface{}) bool {
client := v.(*defaultClient)
- client.syncSettings()
+ client.trySyncSettings()
return true
})
}
@@ -263,7 +264,7 @@ func (cm *defaultClientManager) handleGrpcError(rpcClient
RpcClient, err error)
if err != nil {
if e, ok := status.FromError(err); ok {
if e.Code() == codes.Unavailable {
- sugarBaseLogger.Errorf("happened unavailable
error=%v, close rpcClient=%s", err, rpcClient.GetTarget())
+ sugarBaseLogger.Errorf("happened unavailable
err=%w, close rpcClient=%s", err, rpcClient.GetTarget())
cm.rpcClientTableLock.Lock()
defer cm.rpcClientTableLock.Unlock()
cm.deleteRpcClient(rpcClient)
diff --git a/golang/conn.go b/golang/conn.go
index f5bf1a7..eeca8a0 100644
--- a/golang/conn.go
+++ b/golang/conn.go
@@ -118,6 +118,7 @@ func (c *clientConn) dialSetupOpts(dopts
...grpc.DialOption) (opts []grpc.DialOp
if c.creds != nil {
opts = append(opts, grpc.WithTransportCredentials(c.creds))
}
+ // TODO get requestID in header
opts = append(opts, grpc.WithBlock(), grpc.WithChainUnaryInterceptor(
zaplog.UnaryClientInterceptor(c.opts.Logger),
))
diff --git a/golang/consumer.go b/golang/consumer.go
index 47ed593..b2d9596 100644
--- a/golang/consumer.go
+++ b/golang/consumer.go
@@ -24,6 +24,6 @@ import (
)
type Consumer interface {
- GetConsumerGroup() string
+ GetGroupName() string
wrapReceiveMessageRequest(batchSize int, messageQueue *v2.MessageQueue,
filterExpression *FilterExpression, invisibleDuration time.Duration)
*v2.ReceiveMessageRequest
}
diff --git a/golang/consumer.go b/golang/error.go
similarity index 57%
copy from golang/consumer.go
copy to golang/error.go
index 47ed593..1105ea6 100644
--- a/golang/consumer.go
+++ b/golang/error.go
@@ -18,12 +18,43 @@
package golang
import (
- "time"
+ "errors"
+ "fmt"
v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
)
-type Consumer interface {
- GetConsumerGroup() string
- wrapReceiveMessageRequest(batchSize int, messageQueue *v2.MessageQueue,
filterExpression *FilterExpression, invisibleDuration time.Duration)
*v2.ReceiveMessageRequest
+type ErrRpcStatus struct {
+ Code int32
+ Message string
+}
+
+func (err *ErrRpcStatus) GetCode() int32 {
+ return err.GetCode()
+}
+
+func (err *ErrRpcStatus) GetMessage() string {
+ return err.Message
+}
+
+func (err *ErrRpcStatus) Error() string {
+ codeName, ok := v2.Code_name[err.Code]
+ if !ok {
+ codeName = string(err.Code)
+ }
+ return fmt.Sprintf("CODE: %s, MESSAGE: %s", codeName, err.Message)
+}
+
+var _ = error(&ErrRpcStatus{})
+
+func AsErrRpcStatus(err error) (*ErrRpcStatus, bool) {
+ if err == nil {
+ return nil, false
+ }
+ target, ok := err.(*ErrRpcStatus)
+ if ok {
+ return target, true
+ }
+ err = errors.Unwrap(err)
+ return AsErrRpcStatus(err)
}
diff --git a/golang/example/consumer/simple_consumer/main.go
b/golang/example/consumer/simple_consumer/main.go
index aee2d91..c4139b2 100644
--- a/golang/example/consumer/simple_consumer/main.go
+++ b/golang/example/consumer/simple_consumer/main.go
@@ -29,12 +29,12 @@ import (
)
const (
- Topic = "xxxxxx"
- ConsumerGroup = "xxxxxx"
- Endpoint = "xxxxxx"
- Region = "xxxxxx"
- AccessKey = "xxxxxx"
- SecretKey = "xxxxxx"
+ Topic = "xxxxxx"
+ GroupName = "xxxxxx"
+ Endpoint = "xxxxxx"
+ Region = "xxxxxx"
+ AccessKey = "xxxxxx"
+ SecretKey = "xxxxxx"
)
var (
@@ -54,7 +54,7 @@ func main() {
// new simpleConsumer instance
simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
Endpoint: Endpoint,
- Group: ConsumerGroup,
+ Group: GroupName,
Region: Region,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
diff --git a/golang/example/producer/async/main.go
b/golang/example/producer/async/main.go
index 988987a..6c1e1df 100644
--- a/golang/example/producer/async/main.go
+++ b/golang/example/producer/async/main.go
@@ -30,12 +30,12 @@ import (
)
const (
- Topic = "xxxxxx"
- ConsumerGroup = "xxxxxx"
- Endpoint = "xxxxxx"
- Region = "xxxxxx"
- AccessKey = "xxxxxx"
- SecretKey = "xxxxxx"
+ Topic = "xxxxxx"
+ GroupName = "xxxxxx"
+ Endpoint = "xxxxxx"
+ Region = "xxxxxx"
+ AccessKey = "xxxxxx"
+ SecretKey = "xxxxxx"
)
func main() {
@@ -45,7 +45,7 @@ func main() {
// new producer instance
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
- Group: ConsumerGroup,
+ Group: GroupName,
Region: Region,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
diff --git a/golang/example/producer/delay/main.go
b/golang/example/producer/delay/main.go
index 23f0aa0..695e107 100644
--- a/golang/example/producer/delay/main.go
+++ b/golang/example/producer/delay/main.go
@@ -30,12 +30,12 @@ import (
)
const (
- Topic = "xxxxxx"
- ConsumerGroup = "xxxxxx"
- Endpoint = "xxxxxx"
- Region = "xxxxxx"
- AccessKey = "xxxxxx"
- SecretKey = "xxxxxx"
+ Topic = "xxxxxx"
+ GroupName = "xxxxxx"
+ Endpoint = "xxxxxx"
+ Region = "xxxxxx"
+ AccessKey = "xxxxxx"
+ SecretKey = "xxxxxx"
)
func main() {
@@ -45,7 +45,7 @@ func main() {
// new producer instance
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
- Group: ConsumerGroup,
+ Group: GroupName,
Region: Region,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
diff --git a/golang/example/producer/fifo/main.go
b/golang/example/producer/fifo/main.go
index 40080c3..c2250d3 100644
--- a/golang/example/producer/fifo/main.go
+++ b/golang/example/producer/fifo/main.go
@@ -30,12 +30,12 @@ import (
)
const (
- Topic = "xxxxxx"
- ConsumerGroup = "xxxxxx"
- Endpoint = "xxxxxx"
- Region = "xxxxxx"
- AccessKey = "xxxxxx"
- SecretKey = "xxxxxx"
+ Topic = "xxxxxx"
+ GroupName = "xxxxxx"
+ Endpoint = "xxxxxx"
+ Region = "xxxxxx"
+ AccessKey = "xxxxxx"
+ SecretKey = "xxxxxx"
)
func main() {
@@ -45,7 +45,7 @@ func main() {
// new producer instance
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
- Group: ConsumerGroup,
+ Group: GroupName,
Region: Region,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
diff --git a/golang/example/producer/normal/main.go
b/golang/example/producer/normal/main.go
index 9285c71..5be7497 100644
--- a/golang/example/producer/normal/main.go
+++ b/golang/example/producer/normal/main.go
@@ -30,12 +30,12 @@ import (
)
const (
- Topic = "xxxxxx"
- ConsumerGroup = "xxxxxx"
- Endpoint = "xxxxxx"
- Region = "xxxxxx"
- AccessKey = "xxxxxx"
- SecretKey = "xxxxxx"
+ Topic = "xxxxxx"
+ GroupName = "xxxxxx"
+ Endpoint = "xxxxxx"
+ Region = "xxxxxx"
+ AccessKey = "xxxxxx"
+ SecretKey = "xxxxxx"
)
func main() {
@@ -44,7 +44,7 @@ func main() {
// new producer instance
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
- Group: ConsumerGroup,
+ Group: GroupName,
Region: Region,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
diff --git a/golang/example/producer/transaction/main.go
b/golang/example/producer/transaction/main.go
index 0a5e0e9..ac38d38 100644
--- a/golang/example/producer/transaction/main.go
+++ b/golang/example/producer/transaction/main.go
@@ -30,12 +30,12 @@ import (
)
const (
- Topic = "xxxxxx"
- ConsumerGroup = "xxxxxx"
- Endpoint = "xxxxxx"
- Region = "xxxxxx"
- AccessKey = "xxxxxx"
- SecretKey = "xxxxxx"
+ Topic = "xxxxxx"
+ GroupName = "xxxxxx"
+ Endpoint = "xxxxxx"
+ Region = "xxxxxx"
+ AccessKey = "xxxxxx"
+ SecretKey = "xxxxxx"
)
func main() {
@@ -45,7 +45,7 @@ func main() {
// new producer instance
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
- Group: ConsumerGroup,
+ Group: GroupName,
Region: Region,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
diff --git a/golang/log.go b/golang/log.go
index 11af800..31d3e7f 100644
--- a/golang/log.go
+++ b/golang/log.go
@@ -30,13 +30,14 @@ import (
)
const (
- CLIENT_LOG_ROOT = "rocketmq.client.logRoot"
- CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex"
- CLIENT_LOG_FILESIZE = "rocketmq.client.logFileMaxSize"
- CLIENT_LOG_LEVEL = "rocketmq.client.logLevel"
- CLIENT_LOG_ADDITIVE = "rocketmq.client.log.additive"
- CLIENT_LOG_FILENAME = "rocketmq.client.logFileName"
- CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize"
+ CLIENT_LOG_ROOT = "rocketmq.client.logRoot"
+ CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex"
+ CLIENT_LOG_FILESIZE = "rocketmq.client.logFileMaxSize"
+ CLIENT_LOG_LEVEL = "rocketmq.client.logLevel"
+ // CLIENT_LOG_ADDITIVE = "rocketmq.client.log.additive"
+ CLIENT_LOG_FILENAME = "rocketmq.client.logFileName"
+ // CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize"
+ ENABLE_CONSOLE_APPENDER = "mq.consoleAppender.enabled"
)
var sugarBaseLogger *zap.SugaredLogger
@@ -47,14 +48,14 @@ func ResetLogger() {
func InitLogger() {
writeSyncer := getLogWriter()
- isStdOut := utils.GetenvWithDef("mq.consoleAppender.enabled", "false")
+ isStdOut := utils.GetenvWithDef(ENABLE_CONSOLE_APPENDER, "false")
if isStdOut == "true" {
writeSyncer = os.Stdout
}
encoder := getEncoder()
var atomicLevel = zap.NewAtomicLevel()
- switch strings.ToLower(os.Getenv("rocketmq.log.level")) {
+ switch strings.ToLower(os.Getenv(CLIENT_LOG_LEVEL)) {
case "debug":
atomicLevel.SetLevel(zap.DebugLevel)
case "warn":
diff --git a/golang/message.go b/golang/message.go
index a7441ed..4c16ba6 100644
--- a/golang/message.go
+++ b/golang/message.go
@@ -222,7 +222,7 @@ func fromProtobuf_MessageView2(message *v2.Message,
messageQueue *v2.MessageQueu
case v2.Encoding_GZIP:
unCompressBody, err := utils.GZIPDecode(message.GetBody())
if err != nil {
- sugarBaseLogger.Errorf("failed to uncompress message
body, topic=%s, messageId=%s, err=%v", mv.topic, mv.messageId, err)
+ sugarBaseLogger.Errorf("failed to uncompress message
body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err)
corrupted = true
} else {
mv.body = unCompressBody
diff --git a/golang/metric.go b/golang/metric.go
index 5339688..464b91a 100644
--- a/golang/metric.go
+++ b/golang/metric.go
@@ -43,7 +43,6 @@ var (
topicTag, _ = tag.NewKey("topic")
clientIdTag, _ = tag.NewKey("client_id")
invocationStatusTag, _ = tag.NewKey("invocation_status")
- consumerGroupTag, _ = tag.NewKey("consumer_group")
MLatencyMs = stats.Int64("publish_latency", "Publish latency in
milliseconds", "ms")
@@ -82,7 +81,7 @@ func (dcm *defaultClientMeter) shutdown() {
if ok {
err := oce.Stop()
if err != nil {
- sugarBaseLogger.Errorf("ocExporter stop failed,
err = %v", err)
+ sugarBaseLogger.Errorf("ocExporter stop failed,
err=%w", err)
}
}
}
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index 4fb6008..d7ce79a 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -25,6 +25,7 @@ import (
"io/ioutil"
"net/url"
"os"
+ "runtime"
"strconv"
"strings"
"sync/atomic"
@@ -240,3 +241,9 @@ func GetenvWithDef(key, def string) string {
}
return val
}
+
+func DumpStacks() string {
+ buf := make([]byte, 16384)
+ buf = buf[:runtime.Stack(buf, true)]
+ return string(buf)
+}
diff --git a/golang/producer.go b/golang/producer.go
index a29091c..ee7281e 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -19,7 +19,6 @@ package golang
import (
"context"
- "errors"
"fmt"
"math"
"sync"
@@ -57,9 +56,9 @@ func (p *defaultProducer) Start() error {
}
err2 := p.GracefulStop()
if err2 != nil {
- return fmt.Errorf("startUp err = %v, shutdown err = %v", err,
err2)
+ return fmt.Errorf("startUp err=%w, shutdown err=%w", err, err2)
}
- return fmt.Errorf("startUp err = %v", err)
+ return fmt.Errorf("startUp err=%w", err)
}
var _ = Producer(&defaultProducer{})
@@ -208,7 +207,10 @@ func (p *defaultProducer) send1(ctx context.Context, topic
string, messageType v
tooManyRequests := false
if err == nil && resp.GetStatus().GetCode() != v2.Code_OK {
tooManyRequests = resp.GetStatus().GetCode() ==
v2.Code_TOO_MANY_REQUESTS
- err = errors.New(resp.String())
+ err = &ErrRpcStatus{
+ Code: int32(resp.Status.GetCode()),
+ Message: resp.GetStatus().GetMessage(),
+ }
}
if err != nil {
messageHookPointsStatus = MessageHookPointsStatus_ERROR
@@ -414,7 +416,10 @@ func (p *defaultProducer) endTransaction(ctx
context.Context, endpoints *v2.Endp
duration := time.Since(watchTime)
messageHookPointsStatus := MessageHookPointsStatus_OK
if err == nil && resp.GetStatus().GetCode() != v2.Code_OK {
- err = errors.New(resp.String())
+ err = &ErrRpcStatus{
+ Code: int32(resp.Status.GetCode()),
+ Message: resp.GetStatus().GetMessage(),
+ }
}
if err != nil {
messageHookPointsStatus = MessageHookPointsStatus_ERROR
@@ -435,7 +440,7 @@ func (p *defaultProducer)
onRecoverOrphanedTransactionCommand(endpoints *v2.Endp
err := p.endTransaction(context.TODO(), endpoints,
mv.GetMessageCommon(), messageId, transactionId,
resolution)
if err != nil {
- p.cli.log.Errorf("exception raised while ending the
transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%v\n",
messageId, transactionId, endpoints, err)
+ p.cli.log.Errorf("exception raised while ending the
transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%w\n",
messageId, transactionId, endpoints, err)
}
}(messageView)
return nil
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index 27616fe..3621090 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -90,7 +90,7 @@ func (pMsg *PublishingMessage) toProtobuf() (*v2.Message,
error) {
},
SystemProperties: &v2.SystemProperties{
Keys: pMsg.msg.GetKeys(),
- MessageId: uuid.New().String(),
+ MessageId: pMsg.messageId,
BornTimestamp: timestamppb.Now(),
BornHost: innerOS.Hostname(),
BodyEncoding: v2.Encoding_IDENTITY,
diff --git a/golang/rpc_client.go b/golang/rpc_client.go
index 0b3ee74..fe64dc9 100644
--- a/golang/rpc_client.go
+++ b/golang/rpc_client.go
@@ -70,7 +70,7 @@ var NewRpcClient = func(target string, opts
...RpcClientOption) (RpcClient, erro
}
conn, err := rc.opts.clientConnFunc(target, rc.opts.connOptions...)
if err != nil {
- return nil, fmt.Errorf("create grpc conn failed, err=%v", err)
+ return nil, fmt.Errorf("create grpc conn failed, err=%w", err)
}
rc.conn = conn
rc.msc = v2.NewMessagingServiceClient(conn.Conn())
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 3a14959..984b368 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -19,7 +19,6 @@ package golang
import (
"context"
- "errors"
"fmt"
"io"
"sync"
@@ -51,7 +50,7 @@ var _ = SimpleConsumer(&defaultSimpleConsumer{})
type defaultSimpleConsumer struct {
cli *defaultClient
- consumerGroup string
+ groupName string
topicIndex int32
scOpts simpleConsumerOptions
scSettings *simpleConsumerSettings
@@ -79,7 +78,7 @@ func (sc *defaultSimpleConsumer)
changeInvisibleDuration0(messageView *MessageVi
Name: messageView.GetTopic(),
},
Group: &v2.Resource{
- Name: sc.consumerGroup,
+ Name: sc.groupName,
},
ReceiptHandle: messageView.GetReceiptHandle(),
InvisibleDuration: durationpb.New(invisibleDuration),
@@ -93,7 +92,10 @@ func (sc *defaultSimpleConsumer)
changeInvisibleDuration0(messageView *MessageVi
sc.cli.log.Errorf("exception raised during message
acknowledgement, messageId=%s, endpoints=%v", messageView.GetMessageId(),
endpoints)
} else if resp.GetStatus().GetCode() != v2.Code_OK {
sc.cli.log.Errorf("failed to change message invisible duration,
messageId=%s, endpoints=%v, code=%v, status message=[%s]",
messageView.GetMessageId(), endpoints, resp.GetStatus().GetCode(),
resp.GetStatus().GetMessage())
- err = errors.New(resp.String())
+ err = &ErrRpcStatus{
+ Code: int32(resp.Status.GetCode()),
+ Message: resp.GetStatus().GetMessage(),
+ }
}
if err != nil {
messageHookPointsStatus = MessageHookPointsStatus_ERROR
@@ -151,7 +153,7 @@ func (sc *defaultSimpleConsumer) Unsubscribe(topic string)
error {
func (sc *defaultSimpleConsumer) wrapReceiveMessageRequest(batchSize int,
messageQueue *v2.MessageQueue, filterExpression *FilterExpression,
invisibleDuration time.Duration) *v2.ReceiveMessageRequest {
return &v2.ReceiveMessageRequest{
Group: &v2.Resource{
- Name: sc.consumerGroup,
+ Name: sc.groupName,
},
MessageQueue: messageQueue,
FilterExpression: &v2.FilterExpression{
@@ -165,7 +167,7 @@ func (sc *defaultSimpleConsumer)
wrapReceiveMessageRequest(batchSize int, messag
func (sc *defaultSimpleConsumer) wrapAckMessageRequest(messageView
*MessageView) *v2.AckMessageRequest {
return &v2.AckMessageRequest{
- Group: sc.scSettings.consumerGroup,
+ Group: sc.scSettings.groupName,
Topic: &v2.Resource{
Name: messageView.GetTopic(),
},
@@ -178,8 +180,8 @@ func (sc *defaultSimpleConsumer)
wrapAckMessageRequest(messageView *MessageView)
}
}
-func (sc *defaultSimpleConsumer) GetConsumerGroup() string {
- return sc.consumerGroup
+func (sc *defaultSimpleConsumer) GetGroupName() string {
+ return sc.groupName
}
func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request
*v2.ReceiveMessageRequest, messageQueue *v2.MessageQueue, timeout
time.Duration) ([]*MessageView, error) {
@@ -205,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx
context.Context, request *v2
break
}
if err != nil {
- sc.cli.log.Errorf("simpleConsumer recv msg
err=%v", err)
+ sc.cli.log.Errorf("simpleConsumer recv msg
err=%w", err)
break
}
resps = append(resps, resp)
@@ -246,7 +248,10 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx
context.Context, request *v2
if status.GetCode() == v2.Code_OK {
return messageViewList, nil
} else {
- return nil, fmt.Errorf("[error] code=%d, message=%s",
status.GetCode().Number(), status.GetMessage())
+ return nil, &ErrRpcStatus{
+ Code: int32(status.GetCode()),
+ Message: status.GetMessage(),
+ }
}
}
}
@@ -305,7 +310,7 @@ func (sc *defaultSimpleConsumer)
onVerifyMessageCommand(endpoints *v2.Endpoints,
func (sc *defaultSimpleConsumer) wrapHeartbeatRequest() *v2.HeartbeatRequest {
return &v2.HeartbeatRequest{
- Group: sc.scSettings.consumerGroup,
+ Group: sc.scSettings.groupName,
ClientType: v2.ClientType_SIMPLE_CONSUMER,
}
}
@@ -320,9 +325,9 @@ var NewSimpleConsumer = func(config *Config, opts
...SimpleConsumerOption) (Simp
return nil, err
}
sc := &defaultSimpleConsumer{
- scOpts: *scOpts,
- cli: cli.(*defaultClient),
- consumerGroup: config.Group,
+ scOpts: *scOpts,
+ cli: cli.(*defaultClient),
+ groupName: config.Group,
awaitDuration: scOpts.awaitDuration,
subscriptionExpressions: scOpts.subscriptionExpressions,
@@ -341,8 +346,8 @@ var NewSimpleConsumer = func(config *Config, opts
...SimpleConsumerOption) (Simp
clientType: v2.ClientType_SIMPLE_CONSUMER,
requestTimeout: sc.cli.opts.timeout,
- consumerGroup: &v2.Resource{
- Name: sc.consumerGroup,
+ groupName: &v2.Resource{
+ Name: sc.groupName,
},
longPollingTimeout: scOpts.awaitDuration,
subscriptionExpressions: scOpts.subscriptionExpressions,
@@ -359,7 +364,7 @@ func (sc *defaultSimpleConsumer) Start() error {
}
err2 := sc.GracefulStop()
if err2 != nil {
- return fmt.Errorf("startUp err = %v, shutdown err = %v", err,
err2)
+ return fmt.Errorf("startUp err=%w, shutdown err=%w", err, err2)
}
return err
}
diff --git a/golang/simple_consumer_options.go
b/golang/simple_consumer_options.go
index 3df80b8..2f919e9 100644
--- a/golang/simple_consumer_options.go
+++ b/golang/simple_consumer_options.go
@@ -108,7 +108,7 @@ type simpleConsumerSettings struct {
retryPolicy *v2.RetryPolicy
requestTimeout time.Duration
- consumerGroup *v2.Resource
+ groupName *v2.Resource
longPollingTimeout time.Duration
subscriptionExpressions map[string]*FilterExpression
}
@@ -177,7 +177,7 @@ func (sc *simpleConsumerSettings) toProtobuf() *v2.Settings
{
}
subSetting := &v2.Settings_Subscription{
Subscription: &v2.Subscription{
- Group: sc.consumerGroup,
+ Group: sc.groupName,
Subscriptions: subscriptions,
LongPollingTimeout:
durationpb.New(sc.longPollingTimeout),
},
diff --git a/golang/transaction.go b/golang/transaction.go
index 47343ce..51b434c 100644
--- a/golang/transaction.go
+++ b/golang/transaction.go
@@ -67,7 +67,7 @@ func (t *transactionImpl) Commit() error {
err :=
t.producerImpl.(*defaultProducer).endTransaction(context.TODO(),
sendReceipt.Endpoints,
pubMessage.msg.GetMessageCommon(),
sendReceipt.MessageID, sendReceipt.TransactionId, COMMIT)
if err != nil {
- sugarBaseLogger.Errorf("transaction message commit
failed, err=%v", err)
+ sugarBaseLogger.Errorf("transaction message commit
failed, err=%w", err)
}
return true
})
@@ -81,7 +81,7 @@ func (t *transactionImpl) RollBack() error {
err :=
t.producerImpl.(*defaultProducer).endTransaction(context.TODO(),
sendReceipt.Endpoints,
pubMessage.msg.GetMessageCommon(),
sendReceipt.MessageID, sendReceipt.TransactionId, ROLLBACK)
if err != nil {
- sugarBaseLogger.Errorf("transaction message rollback
failed, err=%v", err)
+ sugarBaseLogger.Errorf("transaction message rollback
failed, err=%w", err)
}
return true
})