This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 40111a9 [ISSUE #131]Update CI config
40111a9 is described below
commit 40111a9253ae3c37c028fccf0d45db7c05c45320
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jul 29 10:04:42 2019 +0800
[ISSUE #131]Update CI config
#131
---
.gitignore | 4 +-
.travis.yml | 19 ++---
consumer/consumer_test.go | 4 +-
consumer/interceptor.go | 2 +-
consumer/lock.go | 3 +-
consumer/push_consumer.go | 24 +++---
consumer/statistics.go | 3 +-
examples/producer/trace/main.go | 2 +-
go.mod | 2 -
internal/client_test.go | 45 ------------
internal/mock_client.go | 5 +-
internal/remote/remote_client_test.go | 47 +++++++-----
internal/route.go | 1 -
internal/route_test.go | 135 ----------------------------------
primitive/result_test.go | 3 +-
producer/producer.go | 8 +-
16 files changed, 66 insertions(+), 241 deletions(-)
diff --git a/.gitignore b/.gitignore
index cedd413..cb35ce2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
.idea
go.mod
-go.sum
\ No newline at end of file
+go.sum
+vendor/
+coverage.txt
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index 1fbef35..82f760c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,8 +1,8 @@
language: go
go:
- - "1.10.x"
- "1.11.x"
+ - "1.12.x"
go_import_path: github.com/apache/rocketmq-client-go
@@ -12,16 +12,9 @@ env:
- BROKER_ADDRESS=127.0.0.1:10911
- TOPIC=test
- GROUP=testGroup
+ - GO111MODULE=on
matrix:
- - OS_TYPE=centos OS_VERSION=7
-
-before_install:
- - cd ${TRAVIS_HOME}
- - wget
https://opensource-rocketmq-client-us.oss-us-west-1.aliyuncs.com/cpp-client/linux/1.2.0/RHEL7.x/librocketmq.tar.gz
- - tar -xzf librocketmq.tar.gz
- - sudo cp librocketmq.so librocketmq.a /usr/local/lib/
- - sudo cp -r rocketmq /usr/local/include/
- - cd ${GOPATH}/src/github.com/apache/rocketmq-client-go
+ - OS_TYPE=centos OS_VERSION=7
before_script:
- cd ${TRAVIS_HOME}
@@ -36,5 +29,9 @@ before_script:
- ./bin/mqadmin updateSubGroup -n ${NAME_SERVER_ADDRESS} -b
${BROKER_ADDRESS} -g ${GROUP}
script:
- - export LD_LIBRARY_PATH=/usr/local/lib
- cd ${GOPATH}/src/github.com/apache/rocketmq-client-go
+ - go fmt ./... && [[ -z `git status -s` ]]
+ - go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
+
+after_success:
+ - bash <(curl -s https://codecov.io/bash)
\ No newline at end of file
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index e433392..a837b24 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -26,7 +26,7 @@ import (
func TestParseTimestamp(t *testing.T) {
layout := "20060102150405"
- timestamp, err := time.ParseInLocation(layout, "20190430193409",
time.Local)
+ timestamp, err := time.ParseInLocation(layout, "20190430193409",
time.UTC)
assert.Nil(t, err)
- assert.Equal(t, int64(1556624049), timestamp.Unix())
+ assert.Equal(t, int64(1556652849), timestamp.Unix())
}
diff --git a/consumer/interceptor.go b/consumer/interceptor.go
index 260ac6f..6b050df 100644
--- a/consumer/interceptor.go
+++ b/consumer/interceptor.go
@@ -26,7 +26,7 @@ import (
"github.com/apache/rocketmq-client-go/primitive"
)
-// WithTrace support rocketmq trace:
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
+// WithTrace support rocketmq trace:
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
func WithTrace(traceCfg primitive.TraceConfig) Option {
return func(options *consumerOptions) {
diff --git a/consumer/lock.go b/consumer/lock.go
index d923f2c..2376400 100644
--- a/consumer/lock.go
+++ b/consumer/lock.go
@@ -28,8 +28,7 @@ type QueueLock struct {
}
func newQueueLock() *QueueLock {
- return &QueueLock{
- }
+ return &QueueLock{}
}
func (ql QueueLock) fetchLock(queue primitive.MessageQueue) sync.Locker {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 7b4d65e..9878c56 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -696,10 +696,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
var err error
msgCtx := &primitive.ConsumeMessageContext{
- Properties: make(map[string]string),
+ Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
- MQ: mq,
- Msgs: msgs,
+ MQ: mq,
+ Msgs: msgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -717,7 +717,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
msgCtx.Properties[primitive.PropCtxType] =
string(primitive.TimeoutReturn)
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] =
string(primitive.SuccessReturn)
- } else if result == ConsumeRetryLater{
+ } else if result == ConsumeRetryLater {
msgCtx.Properties[primitive.PropCtxType] =
string(primitive.FailedReturn)
}
@@ -812,10 +812,10 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
ctx := context.Background()
msgCtx := &primitive.ConsumeMessageContext{
- Properties: make(map[string]string),
+ Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
- MQ: mq,
- Msgs: msgs,
+ MQ: mq,
+ Msgs: msgs,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
@@ -853,10 +853,10 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
case ConsumeSuccess:
commitOffset = pq.commit()
case SuspendCurrentQueueAMoment:
- if (pc.checkReconsumeTimes(msgs)) {
+ if pc.checkReconsumeTimes(msgs) {
pq.putMessage(msgs...)
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) *
time.Millisecond)
- continueConsume = false;
+ continueConsume = false
} else {
commitOffset = pq.commit()
}
@@ -866,15 +866,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
switch result {
case ConsumeSuccess:
case Commit:
- commitOffset = pq.commit();
+ commitOffset = pq.commit()
case Rollback:
// pq.rollback
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) *
time.Millisecond)
continueConsume = false
case SuspendCurrentQueueAMoment:
- if (pc.checkReconsumeTimes(msgs)) {
+ if pc.checkReconsumeTimes(msgs) {
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) *
time.Millisecond)
- continueConsume = false;
+ continueConsume = false
}
default:
}
diff --git a/consumer/statistics.go b/consumer/statistics.go
index f01364a..b85e056 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -19,10 +19,11 @@ package consumer
import (
"container/list"
- "github.com/apache/rocketmq-client-go/rlog"
"sync"
"sync/atomic"
"time"
+
+ "github.com/apache/rocketmq-client-go/rlog"
)
var (
diff --git a/examples/producer/trace/main.go b/examples/producer/trace/main.go
index 266c783..ac6983f 100644
--- a/examples/producer/trace/main.go
+++ b/examples/producer/trace/main.go
@@ -31,7 +31,7 @@ import (
func main() {
namesrvs := []string{"127.0.0.1:9876"}
traceCfg := primitive.TraceConfig{
- Access: primitive.Local,
+ Access: primitive.Local,
}
p, _ := rocketmq.NewProducer(
diff --git a/go.mod b/go.mod
index e1b0d88..f271d91 100644
--- a/go.mod
+++ b/go.mod
@@ -1,7 +1,5 @@
module github.com/apache/rocketmq-client-go
-go 1.12
-
require (
github.com/emirpasic/gods v1.12.0
github.com/golang/mock v1.3.1
diff --git a/internal/client_test.go b/internal/client_test.go
deleted file mode 100644
index 7814ddd..0000000
--- a/internal/client_test.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package internal
-
-import (
- "context"
- "testing"
-)
-
-func TestRMQClient_PullMessage(t *testing.T) {
- client := GetOrNewRocketMQClient(ClientOptions{})
- req := &PullMessageRequest{
- ConsumerGroup: "testGroup",
- Topic: "wenfeng",
- QueueId: 0,
- QueueOffset: 0,
- MaxMsgNums: 32,
- SysFlag: 0x1 << 2,
- SubExpression: "*",
- ExpressionType: "TAG",
- }
- res, err := client.PullMessage(context.Background(), "127.0.0.1:10911",
req)
- if err != nil {
- t.Fatal(err.Error())
- }
-
- for _, a := range res.GetMessageExts() {
- t.Log(string(a.Body))
- }
-}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 825c9e4..d19f0e9 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -6,11 +6,12 @@ package internal
import (
context "context"
+ reflect "reflect"
+ time "time"
+
remote "github.com/apache/rocketmq-client-go/internal/remote"
primitive "github.com/apache/rocketmq-client-go/primitive"
gomock "github.com/golang/mock/gomock"
- reflect "reflect"
- time "time"
)
// MockInnerProducer is a mock of InnerProducer interface
diff --git a/internal/remote/remote_client_test.go
b/internal/remote/remote_client_test.go
index 06dcedc..ff6900e 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -19,7 +19,6 @@ package remote
import (
"bytes"
"errors"
- "github.com/apache/rocketmq-client-go/internal/utils"
"math/rand"
"net"
"reflect"
@@ -27,6 +26,8 @@ import (
"testing"
"time"
+ "github.com/apache/rocketmq-client-go/internal/utils"
+
"github.com/stretchr/testify/assert"
)
@@ -86,11 +87,8 @@ func TestResponseFutureIsTimeout(t *testing.T) {
if future.isTimeout() != false {
t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t",
false, future.isTimeout())
}
- time.Sleep(time.Duration(700) * time.Millisecond)
- if future.isTimeout() != true {
- t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t",
true, future.isTimeout())
- }
-
+ time.Sleep(time.Duration(1000) * time.Millisecond)
+ assert.True(t, future.isTimeout(), "ResponseFuture's istimeout should
be true")
}
func TestResponseFutureWaitResponse(t *testing.T) {
@@ -160,6 +158,8 @@ func TestCreateScanner(t *testing.T) {
}
func TestInvokeSync(t *testing.T) {
+ addr := ":3004"
+
clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello
RocketMQ"))
serverSendRemotingCommand := NewRemotingCommand(20, nil,
[]byte("Welcome native"))
serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
@@ -168,19 +168,23 @@ func TestInvokeSync(t *testing.T) {
wg.Add(1)
client := NewRemotingClient()
go func() {
- receiveCommand, err := client.InvokeSync(":3000",
+ receiveCommand, err := client.InvokeSync(addr,
clientSendRemtingCommand, time.Second)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
} else {
- if !reflect.DeepEqual(&receiveCommand,
&serverSendRemotingCommand) {
- t.Errorf("remotingCommand prased in client is
different from server. ")
- }
+ assert.Equal(t, len(receiveCommand.ExtFields), 0)
+ assert.Equal(t,
len(serverSendRemotingCommand.ExtFields), 0)
+ // in order to avoid the difference of ExtFields
between the receiveCommand and serverSendRemotingCommand
+ // the ExtFields in receiveCommand is
map[string]string(nil), but serverSendRemotingCommand is map[string]string{}
+ receiveCommand.ExtFields = nil
+ serverSendRemotingCommand.ExtFields = nil
+ assert.Equal(t, receiveCommand,
serverSendRemotingCommand, "remotingCommand prased in client is different from
server.")
}
wg.Done()
}()
- l, err := net.Listen("tcp", ":3000")
+ l, err := net.Listen("tcp", addr)
if err != nil {
t.Fatal(err)
}
@@ -209,13 +213,15 @@ func TestInvokeSync(t *testing.T) {
if err != nil {
t.Fatalf("failed to write body to conneciton.")
}
- return
+ goto done
}
}
- wg.Done()
+done:
+ wg.Wait()
}
func TestInvokeAsync(t *testing.T) {
+ addr := ":3006"
var wg sync.WaitGroup
cnt := 50
wg.Add(cnt)
@@ -225,7 +231,7 @@ func TestInvokeAsync(t *testing.T) {
time.Sleep(time.Duration(rand.Intn(100)) *
time.Millisecond)
t.Logf("[Send: %d] asychronous message", index)
sendRemotingCommand := randomNewRemotingCommand()
- err := client.InvokeAsync(":3000", sendRemotingCommand,
time.Second, func(r *ResponseFuture) {
+ err := client.InvokeAsync(addr, sendRemotingCommand,
time.Second, func(r *ResponseFuture) {
t.Logf("[Receive: %d] asychronous message
response", index)
if string(sendRemotingCommand.Body) !=
string(r.ResponseCommand.Body) {
t.Errorf("wrong response message.
want=%s, got=%s", string(sendRemotingCommand.Body),
@@ -239,7 +245,7 @@ func TestInvokeAsync(t *testing.T) {
}(i)
}
- l, err := net.Listen("tcp", ":3000")
+ l, err := net.Listen("tcp", addr)
if err != nil {
t.Fatalf("failed to create tcp network. %s", err)
}
@@ -276,6 +282,8 @@ done:
}
func TestInvokeAsyncTimeout(t *testing.T) {
+ addr := ":3002"
+
clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello
RocketMQ"))
serverSendRemotingCommand := NewRemotingCommand(20, nil,
[]byte("Welcome native"))
serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
@@ -289,7 +297,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
clientSend.Add(1)
go func() {
clientSend.Wait()
- err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+ err := client.InvokeAsync(addr, clientSendRemtingCommand,
time.Duration(1000), func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
assert.Equal(t, utils.ErrRequestTimeout, r.Err)
@@ -298,7 +306,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
assert.Nil(t, err, "failed to invokeSync.")
}()
- l, err := net.Listen("tcp", ":3000")
+ l, err := net.Listen("tcp", addr)
assert.Nil(t, err)
defer l.Close()
clientSend.Done()
@@ -323,20 +331,21 @@ done:
}
func TestInvokeOneWay(t *testing.T) {
+ addr := ":3008"
clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello
RocketMQ"))
var wg sync.WaitGroup
wg.Add(1)
client := NewRemotingClient()
go func() {
- err := client.InvokeOneWay(":3000", clientSendRemtingCommand,
3*time.Second)
+ err := client.InvokeOneWay(addr, clientSendRemtingCommand,
3*time.Second)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
}
wg.Done()
}()
- l, err := net.Listen("tcp", ":3000")
+ l, err := net.Listen("tcp", addr)
if err != nil {
t.Fatal(err)
}
diff --git a/internal/route.go b/internal/route.go
index 6053fd8..9c6caa9 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -65,7 +65,6 @@ var (
//subscribeInfoMap sync.Map
routeDataMap sync.Map
lockNamesrv sync.Mutex
-
)
func cleanOfflineBroker() {
diff --git a/internal/route_test.go b/internal/route_test.go
deleted file mode 100644
index 9c3241e..0000000
--- a/internal/route_test.go
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package internal
-
-import (
- . "github.com/smartystreets/goconvey/convey"
- "testing"
-)
-
-const (
- topic = "TopicTest"
-)
-
-func init() {
- srvs := []string{"127.0.0.1:9876"}
- namesrv, err := NewNamesrv(srvs...)
- if err != nil {
- panic("register namesrv fail")
- }
- RegisterNamsrv(namesrv)
-}
-
-func TestAddBroker(t *testing.T) {
- Convey("Given a starting topic", t, func() {
- remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
- So(err, ShouldBeNil)
- AddBroker(remoteRouteData)
-
- Convey("brokerData from brokerAddressesMap by brokeName should
be deep equal remoteBrokeData from server", func() {
- for _, remoteBrokerData := range
remoteRouteData.BrokerDataList {
- brokerName := remoteBrokerData.BrokerName
- brokerData, ok :=
brokerAddressesMap.Load(brokerName)
- So(ok, ShouldBeTrue)
- So(brokerData, ShouldResemble, remoteBrokerData)
- }
- })
- })
-}
-
-func TestUpdateTopicRouteInfo(t *testing.T) {
- Convey("Given a starting topic", t, func() {
- updatedRouteData := UpdateTopicRouteInfo(topic)
-
- Convey("updatedRouteData should be deep equal remoteRouteData",
func() {
- remoteRouteData, err :=
queryTopicRouteInfoFromServer(topic)
- So(err, ShouldBeNil)
- So(updatedRouteData, ShouldResemble, remoteRouteData)
- })
- Convey("updatedRouteData should be deep equal localRouteData",
func() {
- localRouteData, exist := routeDataMap.Load(topic)
- So(exist, ShouldBeTrue)
- So(updatedRouteData, ShouldResemble, localRouteData)
- })
- })
-}
-
-func TestFindBrokerAddrByTopic(t *testing.T) {
- Convey("Given a starting topic", t, func() {
- addr := FindBrokerAddrByTopic(topic)
- remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
- So(err, ShouldBeNil)
- brokerAddrList := remoteRouteData.BrokerDataList
-
- Convey("addr from FindBrokerAddrByTopic should be contained in
remoteRouteData", func() {
- flag := false
- for _, brokerData := range brokerAddrList {
- for _, ba := range brokerData.BrokerAddresses {
- if ba == addr {
- flag = true
- break
- }
- }
- }
- So(flag, ShouldBeTrue)
- })
- })
-}
-
-func TestFindBrokerAddrByName(t *testing.T) {
- Convey("Given a starting topic", t, func() {
- remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
- So(err, ShouldBeNil)
- brokerAddrList := remoteRouteData.BrokerDataList
-
- Convey("addr from FindBrokerAddrByName should be equal
remoteBrokerAddr from server", func() {
- for _, brokerData := range brokerAddrList {
- brokerName := brokerData.BrokerName
- addr := FindBrokerAddrByName(brokerName)
- remoteBrokerAddr :=
brokerData.BrokerAddresses[MasterId]
- So(addr, ShouldEqual, remoteBrokerAddr)
- }
- })
- })
-}
-
-func TestFindBrokerAddressInSubscribe(t *testing.T) {
- Convey("Given a starting topic", t, func() {
- remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
- So(err, ShouldBeNil)
- brokerAddrList := remoteRouteData.BrokerDataList
-
- Convey("range BrokerAddress and compare them in turn", func() {
- for _, brokerData := range brokerAddrList {
- brokerName := brokerData.BrokerName
- for id, ba := range brokerData.BrokerAddresses {
- findBrokerRes :=
FindBrokerAddressInSubscribe(brokerName, id, true)
- res := &FindBrokerResult{
- BrokerAddr: ba,
- Slave: false,
- BrokerVersion:
findBrokerVersion(brokerName, ba),
- }
- if id != MasterId {
- res.Slave = true
- }
- So(findBrokerRes, ShouldResemble, res)
- }
- }
- })
- })
-}
diff --git a/primitive/result_test.go b/primitive/result_test.go
index a3fd5ab..9324aff 100644
--- a/primitive/result_test.go
+++ b/primitive/result_test.go
@@ -18,7 +18,6 @@ limitations under the License.
package primitive
import (
- "strings"
"testing"
. "github.com/smartystreets/goconvey/convey"
@@ -43,7 +42,7 @@ func TestCreateMessageId(t *testing.T) {
id := createMessageId(b, port, offset)
Convey("generated messageId should be equal to expected",
func() {
- assert.Equal(t,
strings.ToLower("0A5DE93A00002A9F0000000000430154"), id)
+ assert.Equal(t, "0A5DE93A00002A9F0000000000430154", id)
})
})
diff --git a/producer/producer.go b/producer/producer.go
index 13f7952..698924c 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -131,11 +131,11 @@ func (p *defaultProducer) SendSync(ctx context.Context,
msg *primitive.Message)
if p.interceptor != nil {
primitive.WithMethod(ctx, primitive.SendSync)
producerCtx := &primitive.ProducerCtx{
- ProducerGroup: p.group,
+ ProducerGroup: p.group,
CommunicationMode: primitive.SendSync,
- BornHost: utils.LocalIP,
- Message: *msg,
- SendResult: resp,
+ BornHost: utils.LocalIP,
+ Message: *msg,
+ SendResult: resp,
}
ctx = primitive.WithProducerCtx(ctx, producerCtx)