dongeforever closed pull request #13: Implementing SendMessageOneway
URL: https://github.com/apache/rocketmq-client-go/pull/13
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/api.go b/core/api.go
index 8ad6af0..58c1465 100644
--- a/core/api.go
+++ b/core/api.go
@@ -32,6 +32,22 @@ type clientConfig struct {
LogC *LogConfig
}
+func (config *clientConfig) string() string {
+ // For security, don't print Credentials.
+ str := ""
+ str = strJoin(str, "GroupId", config.GroupID)
+ str = strJoin(str, "NameServer", config.NameServer)
+ str = strJoin(str, "NameServerDomain", config.NameServerDomain)
+ str = strJoin(str, "GroupName", config.GroupName)
+ str = strJoin(str, "InstanceName", config.InstanceName)
+
+ if config.LogC != nil {
+ str = strJoin(str, "LogConfig", config.LogC.String())
+ }
+
+ return str
+}
+
// NewProducer create a new producer with config
func NewProducer(config *ProducerConfig) (Producer, error) {
return newDefaultProducer(config)
@@ -46,11 +62,21 @@ type ProducerConfig struct {
}
func (config *ProducerConfig) String() string {
- // For security, don't print Credentials default.
- return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s,
InstanceName: %s, NameServer: %s, "+
- "SendMsgTimeout: %d, CompressLevel: %d, MaxMessageSize: %d, ]",
config.NameServer, config.GroupID,
- config.NameServerDomain, config.GroupName, config.InstanceName,
config.SendMsgTimeout, config.CompressLevel,
- config.MaxMessageSize)
+ str := "ProducerConfig=[" + config.clientConfig.string()
+
+ if config.SendMsgTimeout > 0 {
+ str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
+ }
+
+ if config.CompressLevel > 0 {
+ str = strJoin(str, "CompressLevel", config.CompressLevel)
+ }
+
+ if config.MaxMessageSize > 0 {
+ str = strJoin(str, "MaxMessageSize", config.MaxMessageSize)
+ }
+
+ return str + "]"
}
type Producer interface {
@@ -61,8 +87,8 @@ type Producer interface {
// SendMessageOrderly send the message orderly
SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg
interface{}, autoRetryTimes int) SendResult
- // SendMessageAsync send a message with async
- SendMessageAsync(msg *Message)
+ // SendMessageOneway send a message with oneway
+ SendMessageOneway(msg *Message)
}
// NewPushConsumer create a new consumer with config.
@@ -70,6 +96,24 @@ func NewPushConsumer(config *PushConsumerConfig)
(PushConsumer, error) {
return newPushConsumer(config)
}
+type MessageModel int
+
+const (
+ BroadCasting = MessageModel(1)
+ Clustering = MessageModel(2)
+)
+
+func (mode MessageModel) String() string {
+ switch mode {
+ case BroadCasting:
+ return "BroadCasting"
+ case Clustering:
+ return "Clustering"
+ default:
+ return "Unknown"
+ }
+}
+
// PushConsumerConfig define a new consumer.
type PushConsumerConfig struct {
clientConfig
@@ -79,9 +123,22 @@ type PushConsumerConfig struct {
}
func (config *PushConsumerConfig) String() string {
- return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s,
InstanceName: %s, "+
- "ThreadCount: %d, MessageBatchMaxSize: %d, Model: %v ]",
config.NameServer, config.GroupID,
- config.NameServerDomain, config.InstanceName,
config.ThreadCount, config.MessageBatchMaxSize, config.Model)
+ // For security, don't print Credentials.
+ str := "PushConsumerConfig=[" + config.clientConfig.string()
+
+ if config.ThreadCount > 0 {
+ str = strJoin(str, "ThreadCount", config.ThreadCount)
+ }
+
+ if config.MessageBatchMaxSize > 0 {
+ str = strJoin(str, "MessageBatchMaxSize",
config.MessageBatchMaxSize)
+ }
+
+ if config.Model != 0 {
+ str = strJoin(str, "MessageModel", config.Model.String())
+ }
+
+ return str + "]"
}
type PushConsumer interface {
diff --git a/core/api_test.go b/core/api_test.go
index 05aa6cf..fc507f0 100644
--- a/core/api_test.go
+++ b/core/api_test.go
@@ -17,13 +17,61 @@
package rocketmq
import (
- "fmt"
+ "github.com/stretchr/testify/assert"
"testing"
)
-func TestVersion(test *testing.T) {
- fmt.Println("-----TestGetVersion Start----")
- version := Version()
- fmt.Println(version)
- fmt.Println("-----TestGetVersion Finish----")
+func TestProducerConfig_String(t *testing.T) {
+ pConfig := ProducerConfig{}
+ pConfig.GroupID = "testGroup"
+ pConfig.NameServer = "localhost:9876"
+ pConfig.NameServerDomain = "domain1"
+ pConfig.GroupName = "producerGroupName"
+ pConfig.InstanceName = "testProducer"
+ pConfig.LogC = &LogConfig{
+ Path: "/rocketmq/log",
+ FileNum: 16,
+ FileSize: 1 << 20,
+ Level: LogLevelDebug}
+ pConfig.SendMsgTimeout = 30
+ pConfig.CompressLevel = 4
+ pConfig.MaxMessageSize = 1024
+
+ expect := "ProducerConfig=[GroupId: testGroup, NameServer:
localhost:9876, NameServerDomain: NameServerDomain, " +
+ "GroupId: testGroup, InstanceName: testProducer, " +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576
Level:Debug}, S" +
+ "endMsgTimeout: 30, CompressLevel: 4, MaxMessageSize: 1024, ]"
+ assert.Equal(t, expect, pConfig.String())
+}
+
+func TestPushConsumerConfig_String(t *testing.T) {
+ pcConfig := PushConsumerConfig{}
+ pcConfig.GroupID = "testGroup"
+ pcConfig.NameServer = "localhost:9876"
+ pcConfig.GroupName = "consumerGroupName"
+ pcConfig.InstanceName = "testPushConsumer"
+ pcConfig.LogC = &LogConfig{
+ Path: "/rocketmq/log",
+ FileNum: 16,
+ FileSize: 1 << 20,
+ Level: LogLevelDebug}
+ pcConfig.ThreadCount = 4
+ expect := "PushConsumerConfig=[GroupId: testGroup, NameServer:
localhost:9876, " +
+ "GroupName: consumerGroupName, InstanceName: testPushConsumer,
" +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576
Level:Debug}, ThreadCount: 4, ]"
+ assert.Equal(t, expect, pcConfig.String())
+
+ pcConfig.NameServerDomain = "domain1"
+ expect = "PushConsumerConfig=[GroupId: testGroup, NameServer:
localhost:9876, NameServerDomain: domain1, " +
+ "GroupName: consumerGroupName, InstanceName: testPushConsumer,
" +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576
Level:Debug}, ThreadCount: 4, ]"
+ assert.Equal(t, expect, pcConfig.String())
+
+ pcConfig.MessageBatchMaxSize = 32
+ pcConfig.Model = Clustering
+ expect = "PushConsumerConfig=[GroupId: testGroup, NameServer:
localhost:9876, NameServerDomain: domain1, " +
+ "GroupName: consumerGroupName, InstanceName: testPushConsumer,
" +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576
Level:Debug}, ThreadCount: 4," +
+ " MessageBatchMaxSize: 32, MessageModel: Clustering, ]"
+ assert.Equal(t, expect, pcConfig.String())
}
diff --git a/core/log_test.go b/core/log_test.go
index 8c4a449..e2246f0 100644
--- a/core/log_test.go
+++ b/core/log_test.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rocketmq
import (
diff --git a/core/producer.go b/core/producer.go
index 77ed63b..f402589 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -90,8 +90,7 @@ func newDefaultProducer(config *ProducerConfig)
(*defaultProducer, error) {
code = int(C.SetProducerNameServerAddress(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
NameServerAddress error, code is: %d"+
- "please check cpp logs for details", code))
+ return nil, fmt.Errorf("producer Set NameServerAddress
error, code is: %d", code)
}
}
@@ -100,8 +99,7 @@ func newDefaultProducer(config *ProducerConfig)
(*defaultProducer, error) {
code = int(C.SetProducerNameServerDomain(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
NameServerDomain error, code is: %d"+
- "please check cpp logs for details", code))
+ return nil, fmt.Errorf("producer Set NameServerDomain
error, code is: %d", code)
}
}
@@ -110,8 +108,7 @@ func newDefaultProducer(config *ProducerConfig)
(*defaultProducer, error) {
code = int(C.SetProducerInstanceName(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
InstanceName error, code is: %d"+
- "please check cpp logs for details", code))
+ return nil, fmt.Errorf("producer Set InstanceName
error, code is: %d", code)
}
}
@@ -125,7 +122,7 @@ func newDefaultProducer(config *ProducerConfig)
(*defaultProducer, error) {
C.free(unsafe.Pointer(sk))
C.free(unsafe.Pointer(ch))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
Credentials error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set Credentials error,
code is: %d", code)
}
}
@@ -134,38 +131,38 @@ func newDefaultProducer(config *ProducerConfig)
(*defaultProducer, error) {
code = int(C.SetProducerLogPath(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
LogPath error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set LogPath error,
code is: %d", code)
}
code = int(C.SetProducerLogFileNumAndSize(cproduer,
C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
FileNumAndSize error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set FileNumAndSize
error, code is: %d", code)
}
code = int(C.SetProducerLogLevel(cproduer,
C.CLogLevel(config.LogC.Level)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
LogLevel error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set LogLevel error,
code is: %d", code)
}
}
if config.SendMsgTimeout > 0 {
code = int(C.SetProducerSendMsgTimeout(cproduer,
C.int(config.SendMsgTimeout)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
SendMsgTimeout error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set SendMsgTimeout
error, code is: %d", code)
}
}
if config.CompressLevel > 0 {
code = int(C.SetProducerCompressLevel(cproduer,
C.int(config.CompressLevel)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
CompressLevel error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set CompressLevel
error, code is: %d", code)
}
}
if config.MaxMessageSize > 0 {
code = int(C.SetProducerMaxMessageSize(cproduer,
C.int(config.MaxMessageSize)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
MaxMessageSize error, code is: %d", code))
+ return nil, fmt.Errorf("producer Set MaxMessageSize
error, code is: %d", code)
}
}
@@ -186,7 +183,7 @@ func (p *defaultProducer) String() string {
func (p *defaultProducer) Start() error {
code := int(C.StartProducer(p.cproduer))
if code != 0 {
- return errors.New(fmt.Sprintf("start producer error, error code
is: %d", code))
+ return fmt.Errorf("start producer error, error code is: %d",
code)
}
return nil
}
@@ -247,6 +244,14 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message,
selector MessageQueue
}
}
-func (p *defaultProducer) SendMessageAsync(msg *Message) {
- // TODO
+func (p *defaultProducer) SendMessageOneway(msg *Message) {
+ cmsg := goMsgToC(msg)
+ defer C.DestroyMessage(cmsg)
+
+ code := int(C.SendMessageOneway(p.cproduer, cmsg))
+ if code != 0 {
+ log.Warnf("send message with oneway error, error code is: %d",
code)
+ } else {
+ log.Debugf("Send Message: %s with oneway success.",
msg.String())
+ }
}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 5f63b16..c39dc09 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -33,13 +33,11 @@ import "C"
import (
"fmt"
"github.com/pkg/errors"
- "github.com/prometheus/common/log"
+ log "github.com/sirupsen/logrus"
"sync"
"unsafe"
)
-type MessageModel C.CMessageModel
-
type ConsumeStatus int
const (
@@ -100,7 +98,7 @@ func newPushConsumer(config *PushConsumerConfig)
(PushConsumer, error) {
code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil,
errors.New(fmt.Sprintf(fmt.Sprintf("PushConsumer Set NameServerAddress error,
code is: %d", code)))
+ return nil, fmt.Errorf("PushConsumer Set
NameServerAddress error, code is: %d", code)
}
}
@@ -109,7 +107,7 @@ func newPushConsumer(config *PushConsumerConfig)
(PushConsumer, error) {
code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set
NameServerDomain error, code is: %d", code))
+ return nil, fmt.Errorf("PushConsumer Set
NameServerDomain error, code is: %d", code)
}
}
@@ -118,8 +116,8 @@ func newPushConsumer(config *PushConsumerConfig)
(PushConsumer, error) {
code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set
InstanceName error, code is: %d, "+
- "please check cpp logs for details", code))
+ return nil, fmt.Errorf("PushConsumer Set InstanceName
error, code is: %d, "+
+ "please check cpp logs for details", code)
}
}
@@ -132,53 +130,63 @@ func newPushConsumer(config *PushConsumerConfig)
(PushConsumer, error) {
C.free(unsafe.Pointer(sk))
C.free(unsafe.Pointer(ch))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set
Credentials error, code is: %d", int(code)))
+ return nil, fmt.Errorf("PushConsumer Set Credentials
error, code is: %d", int(code))
}
}
if config.LogC != nil {
cs = C.CString(config.LogC.Path)
- code = int(C.SetProducerLogPath(cconsumer, cs))
+ code = int(C.SetPushConsumerLogPath(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
LogPath error, code is: %d", code))
+ return nil, fmt.Errorf("PushConsumer Set LogPath error,
code is: %d", code)
}
- code = int(C.SetProducerLogFileNumAndSize(cconsumer,
C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ code = int(C.SetPushConsumerLogFileNumAndSize(cconsumer,
C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
FileNumAndSize error, code is: %d", code))
+ return nil, fmt.Errorf("PushConsumer Set FileNumAndSize
error, code is: %d", code)
}
- code = int(C.SetProducerLogLevel(cconsumer,
C.CLogLevel(config.LogC.Level)))
+ code = int(C.SetPushConsumerLogLevel(cconsumer,
C.CLogLevel(config.LogC.Level)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set
LogLevel error, code is: %d", code))
+ return nil, fmt.Errorf("PushConsumer Set LogLevel
error, code is: %d", code)
}
}
if config.ThreadCount > 0 {
code = int(C.SetPushConsumerThreadCount(cconsumer,
C.int(config.ThreadCount)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set
ThreadCount error, code is: %d", int(code)))
+ return nil, fmt.Errorf("PushConsumer Set ThreadCount
error, code is: %d", int(code))
}
}
if config.MessageBatchMaxSize > 0 {
code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer,
C.int(config.MessageBatchMaxSize)))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set
MessageBatchMaxSize error, code is: %d", int(code)))
+ return nil, fmt.Errorf("PushConsumer Set
MessageBatchMaxSize error, code is: %d", int(code))
}
}
- code = int(C.SetPushConsumerMessageModel(cconsumer,
(C.CMessageModel)(config.Model)))
+ if config.Model != 0 {
+ var mode C.CMessageModel
+ switch config.Model {
+ case BroadCasting:
+ mode = C.BROADCASTING
+ case Clustering:
+ mode = C.CLUSTERING
+ }
+ code = int(C.SetPushConsumerMessageModel(cconsumer, mode))
+
+ if code != 0 {
+ return nil, fmt.Errorf("PushConsumer Set
ConsumerMessageModel error, code is: %d", int(code))
+ }
- if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set
ConsumerMessageModel error, code is: %d", int(code)))
}
code = int(C.RegisterMessageCallback(cconsumer,
(C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer
RegisterMessageCallback error, code is: %d", int(code)))
+ return nil, fmt.Errorf("PushConsumer RegisterMessageCallback
error, code is: %d", int(code))
}
consumer.cconsumer = cconsumer
@@ -189,7 +197,7 @@ func newPushConsumer(config *PushConsumerConfig)
(PushConsumer, error) {
func (c *defaultPushConsumer) Start() error {
code := C.StartPushConsumer(c.cconsumer)
if code != 0 {
- return errors.New(fmt.Sprintf("start PushConsumer error, code
is: %d", int(code)))
+ return fmt.Errorf("start PushConsumer error, code is: %d",
int(code))
}
return nil
}
@@ -215,7 +223,7 @@ func (c *defaultPushConsumer) Subscribe(topic, expression
string, consumeFunc fu
}
code := C.Subscribe(c.cconsumer, C.CString(topic),
C.CString(expression))
if code != 0 {
- return errors.New(fmt.Sprintf("subscribe topic: %s failed,
error code is: %d", topic, int(code)))
+ return fmt.Errorf("subscribe topic: %s failed, error code is:
%d", topic, int(code))
}
c.funcsMap.Store(topic, consumeFunc)
log.Infof("subscribe topic[%s] with expression[%s] successfully.",
topic, expression)
diff --git a/core/queue_selector.go b/core/queue_selector.go
index 311c378..7bf1927 100644
--- a/core/queue_selector.go
+++ b/core/queue_selector.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rocketmq
import "C"
diff --git a/core/queue_selector_test.go b/core/queue_selector_test.go
index e9a68ee..74fff80 100644
--- a/core/queue_selector_test.go
+++ b/core/queue_selector_test.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rocketmq
import (
diff --git a/core/utils.go b/core/utils.go
new file mode 100644
index 0000000..e9f83f1
--- /dev/null
+++ b/core/utils.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+import "fmt"
+
+func strJoin(str, key string, value interface{}) string {
+ if key == "" || value == "" {
+ return str
+ }
+
+ return str + key + ": " + fmt.Sprint(value) + ", "
+}
diff --git a/examples/orderproducer/producer.go
b/examples/orderproducer/producer.go
index 5559d87..f3d70c7 100644
--- a/examples/orderproducer/producer.go
+++ b/examples/orderproducer/producer.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package main
import (
diff --git a/examples/producer.go b/examples/producer.go
index 56ecee2..33eab07 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -20,13 +20,12 @@ package main
import (
"fmt"
"github.com/apache/rocketmq-client-go/core"
- "time"
)
func main() {
cfg := &rocketmq.ProducerConfig{}
cfg.GroupID = "testGroup"
- cfg.NameServer = "47.101.55.250:9876"
+ cfg.NameServer = "localhost:9876"
producer, err := rocketmq.NewProducer(cfg)
if err != nil {
fmt.Println("create Producer failed, error:", err)
@@ -39,8 +38,8 @@ func main() {
fmt.Printf("Producer: %s started... \n", producer)
for i := 0; i < 100; i++ {
msg := fmt.Sprintf("Hello RocketMQ-%d", i)
- result := producer.SendMessageSync(&rocketmq.Message{Topic:
"wwf1", Body: msg})
+ result := producer.SendMessageSync(&rocketmq.Message{Topic:
"test", Body: msg})
fmt.Println(fmt.Sprintf("send message: %s result: %s", msg,
result))
}
- time.Sleep(10 * time.Second)
+ fmt.Println("shutdown producer.")
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services