Repository: incubator-rocketmq-externals
Updated Branches:
  refs/heads/master 8ca160f94 -> 42fc22353


[ROCKETMQ-182] RocketMQ Go SDK implementation, closes 
apache/incubator-rocketmq-externals#13


Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/42fc2235
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/42fc2235
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/42fc2235

Branch: refs/heads/master
Commit: 42fc2235371873acf1c09c7f49df9d929865c5c9
Parents: 8ca160f
Author: wenfengwang <[email protected]>
Authored: Thu Apr 27 20:43:15 2017 +0800
Committer: yukon <[email protected]>
Committed: Thu Apr 27 20:43:15 2017 +0800

----------------------------------------------------------------------
 rocketmq-go/.gitignore                          |   1 +
 rocketmq-go/README.md                           |   4 +
 rocketmq-go/example/consumer_example.go         |   2 +-
 rocketmq-go/example/producer_example.go         |   2 +-
 rocketmq-go/model/config/client_config.go       | 169 +++++++++++
 rocketmq-go/model/config/consumer_config.go     |   1 -
 rocketmq-go/model/config/producer_config.go     |   2 +-
 rocketmq-go/model/config/rocketmq_config.go     |   3 +-
 rocketmq-go/model/constant/message_constant.go  |  17 --
 .../model/header/pull_message_request_header.go |   4 +-
 .../model/header/send_message_request_header.go |  37 +++
 .../header/send_message_response_header.go      |   2 +-
 rocketmq-go/model/message/message.go            | 264 ++++++++++++++++
 rocketmq-go/model/message/message_constant.go   |  98 ++++++
 rocketmq-go/model/message/message_queue.go      |  84 ++++++
 rocketmq-go/model/pull_result.go                |  79 +++++
 rocketmq-go/model/query_result.go               |  48 +++
 rocketmq-go/model/request_code.go               | 181 +++++++++++
 rocketmq-go/model/response_code.go              |  67 ++++
 rocketmq-go/model/send_result.go                | 105 +++++++
 rocketmq-go/model/topic_publishInfo.go          |  76 +++++
 rocketmq-go/model/topic_route_data.go           | 125 ++++++++
 rocketmq-go/mq_client_manager.go                |   6 +-
 rocketmq-go/mq_consumer.go                      |   9 +-
 rocketmq-go/mq_producer.go                      |   7 +-
 rocketmq-go/remoting/communication_mode.go      |  26 ++
 rocketmq-go/remoting/custom_header.go           |   1 +
 rocketmq-go/remoting/event_executor.go          | 144 +++++++++
 rocketmq-go/remoting/invoke_callback.go         |  20 --
 rocketmq-go/remoting/remoting_client.go         | 302 ++++++++++++++++++-
 rocketmq-go/remoting/remoting_command.go        | 159 ++++++++++
 rocketmq-go/remoting/response_future.go         |  72 +++++
 rocketmq-go/remoting/rpchook.go                 |  23 ++
 rocketmq-go/service/client_api.go               |  86 ++++++
 rocketmq-go/service/client_error_code.go        |  26 ++
 rocketmq-go/service/consume_messsage_service.go |   1 -
 rocketmq-go/service/mq_client.go                |   3 +-
 rocketmq-go/service/producer_service.go         |  12 +-
 rocketmq-go/service/rebalance_service.go        |   2 +-
 39 files changed, 2195 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/.gitignore
----------------------------------------------------------------------
diff --git a/rocketmq-go/.gitignore b/rocketmq-go/.gitignore
index bc74c0f..37a9a6f 100644
--- a/rocketmq-go/.gitignore
+++ b/rocketmq-go/.gitignore
@@ -7,3 +7,4 @@ coverage.out
 tags
 temp_parser_file
 y.output
+.DS_Store

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-go/README.md b/rocketmq-go/README.md
new file mode 100644
index 0000000..900e918
--- /dev/null
+++ b/rocketmq-go/README.md
@@ -0,0 +1,4 @@
+# RocketMQ Go SDK
+some code refer to below repos:
+* https://github.com/didapinchegit/go_rocket_mq
+* https://github.com/sevennt/go_rocket_mq/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/example/consumer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/consumer_example.go 
b/rocketmq-go/example/consumer_example.go
index 3595798..5c0044f 100644
--- a/rocketmq-go/example/consumer_example.go
+++ b/rocketmq-go/example/consumer_example.go
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package example
+package main
 
 func main() {
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/example/producer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/producer_example.go 
b/rocketmq-go/example/producer_example.go
index b64cdcd..bda2941 100644
--- a/rocketmq-go/example/producer_example.go
+++ b/rocketmq-go/example/producer_example.go
@@ -14,4 +14,4 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package example
+package main

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/config/client_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/client_config.go 
b/rocketmq-go/model/config/client_config.go
new file mode 100644
index 0000000..d9ad88c
--- /dev/null
+++ b/rocketmq-go/model/config/client_config.go
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package config
+
+import (
+       "bytes"
+       "time"
+)
+
+// client common config
+type ClientConfig struct {
+       nameServerAddress             string
+       clientIP                      string
+       instanceName                  string
+       clientCallbackExecutorThreads int // TODO: clientCallbackExecutorThreads
+       // Pulling topic information interval from the named server
+       pullNameServerInterval time.Duration // default 30
+       // Heartbeat interval in microseconds with message broker
+       heartbeatBrokerInterval time.Duration // default 30
+       // Offset persistent interval for consumer
+       persistConsumerOffsetInterval time.Duration // default 5
+       unitMode                      bool
+       unitName                      string
+       vipChannelEnabled             bool
+}
+
+func NewClientConfig() *ClientConfig {
+       return &ClientConfig{
+               unitMode:                      false,
+               pullNameServerInterval:        time.Second * 30,
+               heartbeatBrokerInterval:       time.Second * 30,
+               persistConsumerOffsetInterval: time.Second * 30,
+       }
+}
+
+func (config *ClientConfig) BuildMQClientId() string {
+       var buffer bytes.Buffer
+       buffer.WriteString(config.clientIP)
+       buffer.WriteString("@")
+       buffer.WriteString(config.instanceName)
+       if config.unitName != "" {
+               buffer.WriteString("@")
+               buffer.WriteString(config.unitName)
+       }
+       return buffer.String()
+}
+
+func (config *ClientConfig) ChangeInstanceNameToPID() {
+       // TODO
+}
+
+func (config *ClientConfig) ResetClientConfig(cfg *ClientConfig) {
+       // TODO
+}
+
+func (config *ClientConfig) CloneClientConfig() *ClientConfig {
+       return &ClientConfig{
+               nameServerAddress:             config.nameServerAddress,
+               clientIP:                      config.clientIP,
+               instanceName:                  config.instanceName,
+               clientCallbackExecutorThreads: 
config.clientCallbackExecutorThreads,
+               pullNameServerInterval:        config.pullNameServerInterval,
+               heartbeatBrokerInterval:       config.heartbeatBrokerInterval,
+               persistConsumerOffsetInterval: 
config.persistConsumerOffsetInterval,
+               unitMode:                      config.unitMode,
+               unitName:                      config.unitName,
+               vipChannelEnabled:             config.vipChannelEnabled,
+       }
+}
+
+func (config *ClientConfig) ClientIP() string {
+       return config.clientIP
+}
+
+func (config *ClientConfig) SetClientIP(s string) {
+       config.clientIP = s
+}
+
+func (config *ClientConfig) InstanceName() string {
+       return config.instanceName
+}
+
+func (config *ClientConfig) SetInstanceName(s string) {
+       config.instanceName = s
+}
+
+func (config *ClientConfig) NameServerAddress() string {
+       return config.nameServerAddress
+}
+
+func (config *ClientConfig) SetNameServerAddress(s string) {
+       config.nameServerAddress = s
+}
+
+func (config *ClientConfig) ClientCallbackExecutorThreads() int {
+       return config.clientCallbackExecutorThreads
+}
+
+func (config *ClientConfig) SetClientCallbackExecutorThreads(threads int) {
+       config.clientCallbackExecutorThreads = threads
+}
+
+func (config *ClientConfig) PullNameServerInteval() time.Duration {
+       return config.pullNameServerInterval
+}
+
+func (config *ClientConfig) SetPullNameServerInteval(interval time.Duration) {
+       config.pullNameServerInterval = interval
+}
+
+func (config *ClientConfig) HeartbeatBrokerInterval() time.Duration {
+       return config.heartbeatBrokerInterval
+}
+
+func (config *ClientConfig) SetHeartbeatBrokerInterval(interval time.Duration) 
{
+       config.heartbeatBrokerInterval = interval
+}
+
+func (config *ClientConfig) PersistConsumerOffsetInterval() time.Duration {
+       return config.persistConsumerOffsetInterval
+}
+
+func (config *ClientConfig) SetPersistConsumerOffsetInterval(interval 
time.Duration) {
+       config.persistConsumerOffsetInterval = interval
+}
+
+func (config *ClientConfig) UnitName() string {
+       return config.unitName
+}
+
+func (config *ClientConfig) SetUnitName(name string) {
+       config.unitName = name
+}
+
+func (config *ClientConfig) UnitMode() bool {
+       return config.unitMode
+}
+
+func (config *ClientConfig) SetUnitMode(mode bool) {
+       config.unitMode = mode
+}
+
+func (config *ClientConfig) VipChannelEnabled() bool {
+       return config.vipChannelEnabled
+}
+
+func (config *ClientConfig) SetVipChannelEnabled(enable bool) {
+       config.vipChannelEnabled = enable
+}
+
+func (config *ClientConfig) String() string {
+       //TODO
+       return ""
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/config/consumer_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/consumer_config.go 
b/rocketmq-go/model/config/consumer_config.go
index 4622dae..a37eaa0 100644
--- a/rocketmq-go/model/config/consumer_config.go
+++ b/rocketmq-go/model/config/consumer_config.go
@@ -17,5 +17,4 @@
 package config
 
 type RocketMqConsumerConfig struct {
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/config/producer_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/producer_config.go 
b/rocketmq-go/model/config/producer_config.go
index b990c3c..ce109fb 100644
--- a/rocketmq-go/model/config/producer_config.go
+++ b/rocketmq-go/model/config/producer_config.go
@@ -15,6 +15,6 @@
  *  limitations under the License.
  */
 package config
-type RocketMqProducerConfig struct {
 
+type RocketMqProducerConfig struct {
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/config/rocketmq_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/rocketmq_config.go 
b/rocketmq-go/model/config/rocketmq_config.go
index ac6e89f..56e89b9 100644
--- a/rocketmq-go/model/config/rocketmq_config.go
+++ b/rocketmq-go/model/config/rocketmq_config.go
@@ -15,7 +15,6 @@
  *  limitations under the License.
  */
 package config
-type RocketMqClientConfig struct {
 
+type RocketMqClientConfig struct {
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/constant/message_constant.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/message_constant.go 
b/rocketmq-go/model/constant/message_constant.go
deleted file mode 100644
index 93f23d0..0000000
--- a/rocketmq-go/model/constant/message_constant.go
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package header

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/header/pull_message_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/pull_message_request_header.go 
b/rocketmq-go/model/header/pull_message_request_header.go
index 8d15500..0133796 100644
--- a/rocketmq-go/model/header/pull_message_request_header.go
+++ b/rocketmq-go/model/header/pull_message_request_header.go
@@ -19,6 +19,6 @@ package header
 type PullMessageRequestHeader struct {
 }
 
-func (self *PullMessageRequestHeader) FromMap(headerMap 
map[string]interface{}) {
+func (header *PullMessageRequestHeader) FromMap(headerMap 
map[string]interface{}) {
        return
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/header/send_message_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/send_message_request_header.go 
b/rocketmq-go/model/header/send_message_request_header.go
new file mode 100644
index 0000000..5c828a8
--- /dev/null
+++ b/rocketmq-go/model/header/send_message_request_header.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+type SendMessageRequestHeader struct {
+       //CommandCustomHeader
+       ProducerGroup        string
+       Topic                string
+       DefaultTopic         string
+       DefaultTopicQueueNum int
+       QueueID              int
+       SysFlag              int
+       BornTimestamp        int
+       Flag                 int
+       Properties           string
+       ReconsumeTimes       int
+       UnitMode             bool
+       MaxReconsumeTimes    int
+}
+
+func (header *SendMessageRequestHeader) FromMap(headerMap 
map[string]interface{}) {
+       //TODO
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/header/send_message_response_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/send_message_response_header.go 
b/rocketmq-go/model/header/send_message_response_header.go
index 8b40cdf..9ddacb2 100644
--- a/rocketmq-go/model/header/send_message_response_header.go
+++ b/rocketmq-go/model/header/send_message_response_header.go
@@ -24,6 +24,6 @@ type SendMessageResponseHeader struct {
        MsgRegion     string
 }
 
-func (self *SendMessageResponseHeader) FromMap(headerMap 
map[string]interface{}) {
+func (header *SendMessageResponseHeader) FromMap(headerMap 
map[string]interface{}) {
        return
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/message/message.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message/message.go 
b/rocketmq-go/model/message/message.go
new file mode 100644
index 0000000..633446c
--- /dev/null
+++ b/rocketmq-go/model/message/message.go
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package message
+
+import (
+       "bytes"
+       "compress/zlib"
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "github.com/golang/glog"
+       "io/ioutil"
+)
+
+const (
+       CompressedFlag          = 1 << 0
+       MultiTagsFlag           = 1 << 1
+       TransactionNotType      = 0 << 2
+       TransactionPreparedType = 1 << 2
+       TransactionCommitType   = 2 << 2
+       TransactionRollbackType = 3 << 2
+)
+
+const (
+       NameValueSeparator = 1 + iota
+       PropertySeparator
+)
+
+const (
+       CharacterMaxLength = 255
+)
+
+type Message struct {
+       Topic      string
+       Flag       int32
+       properties map[string]string
+       Body       []byte
+}
+
+func NewDefultMessage(topic string, body []byte) *Message {
+       return NewMessage(topic, "", "", 0, body, true)
+}
+
+type MessageExt struct {
+       Message
+       QueueId       int32
+       StoreSize     int32
+       QueueOffset   int64
+       SysFlag       int32
+       BornTimestamp int64
+       // bornHost
+       StoreTimestamp int64
+       // storeHost
+       MsgId                     string
+       CommitLogOffset           int64
+       BodyCRC                   int32
+       ReconsumeTimes            int32
+       PreparedTransactionOffset int64
+}
+
+func (msg *Message) encodeMessage() []byte {
+       // TODO
+       return nil
+}
+
+func decodeMessage(data []byte) []*MessageExt {
+       buf := bytes.NewBuffer(data)
+       var storeSize, magicCode, bodyCRC, queueId, flag, sysFlag, 
reconsumeTimes, bodyLength, bornPort, storePort int32
+       var queueOffset, physicOffset, preparedTransactionOffset, 
bornTimeStamp, storeTimestamp int64
+       var topicLen byte
+       var topic, body, properties, bornHost, storeHost []byte
+       var propertiesLength int16
+
+       var propertiesMap map[string]string
+
+       msgs := make([]*MessageExt, 0, 32)
+       for buf.Len() > 0 {
+               msg := new(MessageExt)
+               binary.Read(buf, binary.BigEndian, &storeSize)
+               binary.Read(buf, binary.BigEndian, &magicCode)
+               binary.Read(buf, binary.BigEndian, &bodyCRC)
+               binary.Read(buf, binary.BigEndian, &queueId)
+               binary.Read(buf, binary.BigEndian, &flag)
+               binary.Read(buf, binary.BigEndian, &queueOffset)
+               binary.Read(buf, binary.BigEndian, &physicOffset)
+               binary.Read(buf, binary.BigEndian, &sysFlag)
+               binary.Read(buf, binary.BigEndian, &bornTimeStamp)
+               bornHost = make([]byte, 4)
+               binary.Read(buf, binary.BigEndian, &bornHost)
+               binary.Read(buf, binary.BigEndian, &bornPort)
+               binary.Read(buf, binary.BigEndian, &storeTimestamp)
+               storeHost = make([]byte, 4)
+               binary.Read(buf, binary.BigEndian, &storeHost)
+               binary.Read(buf, binary.BigEndian, &storePort)
+               binary.Read(buf, binary.BigEndian, &reconsumeTimes)
+               binary.Read(buf, binary.BigEndian, &preparedTransactionOffset)
+               binary.Read(buf, binary.BigEndian, &bodyLength)
+               if bodyLength > 0 {
+                       body = make([]byte, bodyLength)
+                       binary.Read(buf, binary.BigEndian, body)
+
+                       if (sysFlag & CompressedFlag) == CompressedFlag {
+                               b := bytes.NewReader(body)
+                               z, err := zlib.NewReader(b)
+                               if err != nil {
+                                       fmt.Println(err)
+                                       return nil
+                               }
+
+                               body, err = ioutil.ReadAll(z)
+                               if err != nil {
+                                       fmt.Println(err)
+                                       return nil
+                               }
+                               z.Close()
+                       }
+
+               }
+               binary.Read(buf, binary.BigEndian, &topicLen)
+               topic = make([]byte, 0)
+               binary.Read(buf, binary.BigEndian, &topic)
+               binary.Read(buf, binary.BigEndian, &propertiesLength)
+               if propertiesLength > 0 {
+                       properties = make([]byte, propertiesLength)
+                       binary.Read(buf, binary.BigEndian, &properties)
+                       propertiesMap = make(map[string]string)
+                       json.Unmarshal(properties, &propertiesMap)
+               }
+
+               if magicCode != -626843481 {
+                       fmt.Printf("magic code is error %d", magicCode)
+                       return nil
+               }
+
+               msg.Topic = string(topic)
+               msg.QueueId = queueId
+               msg.SysFlag = sysFlag
+               msg.QueueOffset = queueOffset
+               msg.BodyCRC = bodyCRC
+               msg.StoreSize = storeSize
+               msg.BornTimestamp = bornTimeStamp
+               msg.ReconsumeTimes = reconsumeTimes
+               msg.Flag = flag
+               //msg.commitLogOffset=physicOffset
+               msg.StoreTimestamp = storeTimestamp
+               msg.PreparedTransactionOffset = preparedTransactionOffset
+               msg.Body = body
+               msg.properties = propertiesMap
+
+               msgs = append(msgs, msg)
+       }
+
+       return msgs
+}
+
+func messageProperties2String(properties map[string]string) string {
+       StringBuilder := bytes.NewBuffer([]byte{})
+       if properties != nil && len(properties) != 0 {
+               for k, v := range properties {
+                       binary.Write(StringBuilder, binary.BigEndian, k)        
          // 4
+                       binary.Write(StringBuilder, binary.BigEndian, 
NameValueSeparator) // 4
+                       binary.Write(StringBuilder, binary.BigEndian, v)        
          // 4
+                       binary.Write(StringBuilder, binary.BigEndian, 
PropertySeparator)  // 4
+               }
+       }
+       return StringBuilder.String()
+}
+
+//func (msg Message) checkMessage(producer *DefaultProducer) (err error) {
+//     if err = checkTopic(msg.Topic); err != nil {
+//             if len(msg.Body) == 0 {
+//                     err = errors.New("ResponseCode:" + 
strconv.Itoa(MsgIllegal) + ", the message body is null")
+//             } else if len(msg.Body) > producer.maxMessageSize {
+//                     err = errors.New("ResponseCode:" + 
strconv.Itoa(MsgIllegal) + ", the message body size over max value, MAX:" + 
strconv.Itoa(producer.maxMessageSize))
+//             }
+//     }
+//     return
+//}
+
+//func checkTopic(topic string) (err error) {
+//     if topic == "" {
+//             err = errors.New("the specified topic is blank")
+//     }
+//     if len(topic) > CharacterMaxLength {
+//             err = errors.New("the specified topic is longer than topic max 
length 255")
+//     }
+//     if topic == DefaultTopic {
+//             err = errors.New("the topic[" + topic + "] is conflict with 
default topic")
+//     }
+//     return
+//}
+
+func NewMessage(topic, tags, keys string, flag int32, body []byte, 
waitStoreMsgOK bool) *Message {
+       message := &Message{
+               Topic: topic,
+               Flag:  flag,
+               Body:  body,
+       }
+
+       if tags != "" {
+               message.SetTags(tags)
+       }
+
+       if keys != "" {
+               message.SetKeys(keys)
+       }
+
+       message.SetWaitStoreMsgOK(waitStoreMsgOK)
+       return message
+}
+
+func (msg *Message) SetTags(t string) {
+       msg.putProperty(MessageConst.PropertyTags, t)
+}
+
+func (msg *Message) SetKeys(k string) {
+       msg.putProperty(MessageConst.PropertyKeys, k)
+}
+
+func (msg *Message) SetWaitStoreMsgOK(b bool) {
+
+}
+
+func (msg *Message) Property() map[string]string {
+       return msg.properties
+}
+
+func (msg *Message) putProperty(k, v string) {
+       if msg.properties == nil {
+               msg.properties = make(map[string]string)
+       }
+       if v, found := msg.properties[k]; !found {
+               msg.properties[k] = v
+       } else {
+               glog.Infof("Message put peoperties key: %s existed.", k)
+       }
+}
+
+func (msg *Message) removeProperty(k, v string) string {
+       if v, ok := msg.properties[k]; ok {
+               delete(msg.properties, k)
+               return v
+       }
+       return nil
+}
+
+func (msg *Message) String() string {
+       return fmt.Sprintf("Message [topic=%s, flag=%s, properties=%s, 
body=%s]",
+               msg.Topic, msg.Flag, msg.properties, msg.Body)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/message/message_constant.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message/message_constant.go 
b/rocketmq-go/model/message/message_constant.go
new file mode 100644
index 0000000..3991c7e
--- /dev/null
+++ b/rocketmq-go/model/message/message_constant.go
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package message
+
+type messageConst struct {
+       PropertyKeys                      string
+       PropertyTags                      string
+       PropertyWaitStoreMsgOk            string
+       PropertyDelayTimeLevel            string
+       PropertyRetryTopic                string
+       PropertyRealTopic                 string
+       PropertyRealQueueId               string
+       PropertyTransactionPrepared       string
+       PropertyProducerGroup             string
+       PropertyMinOffset                 string
+       PropertyMaxOffset                 string
+       PropertyBuyerId                   string
+       PropertyOriginMessageId           string
+       PropertyTransferFlag              string
+       PropertyCorrectionFlag            string
+       PropertyMq2Flag                   string
+       PropertyReconsumeTime             string
+       PropertyMsgRegion                 string
+       PropertyUniqClientMessageIdKeyidx string
+       PropertyMaxReconsumeTimes         string
+       PropertyConsumeStartTimeStamp     string
+
+       KeySeparator string
+       systemKeySet []string
+}
+
+var MessageConst = &messageConst{
+       PropertyKeys:                      "KEYS",
+       PropertyTags:                      "TAGS",
+       PropertyWaitStoreMsgOk:            "WAIT",
+       PropertyDelayTimeLevel:            "DELAY",
+       PropertyRetryTopic:                "RETRY_TOPIC",
+       PropertyRealTopic:                 "REAL_TOPIC",
+       PropertyRealQueueId:               "REAL_QID",
+       PropertyTransactionPrepared:       "TRAN_MSG",
+       PropertyProducerGroup:             "PGROUP",
+       PropertyMinOffset:                 "MIN_OFFSET",
+       PropertyMaxOffset:                 "MAX_OFFSET",
+       PropertyBuyerId:                   "BUYER_ID",
+       PropertyOriginMessageId:           "ORIGIN_MESSAGE_ID",
+       PropertyTransferFlag:              "TRANSFER_FLAG",
+       PropertyCorrectionFlag:            "CORRECTION_FLAG",
+       PropertyMq2Flag:                   "MQ2_FLAG",
+       PropertyReconsumeTime:             "RECONSUME_TIME",
+       PropertyMsgRegion:                 "MSG_REGION",
+       PropertyUniqClientMessageIdKeyidx: "UNIQ_KEY",
+       PropertyMaxReconsumeTimes:         "MAX_RECONSUME_TIMES",
+       PropertyConsumeStartTimeStamp:     "CONSUME_START_TIME",
+
+       KeySeparator: "",
+}
+
+func init() {
+       var systemKeySet = []string{}
+       systemKeySet = append(systemKeySet, MessageConst.PropertyKeys)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyTags)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyWaitStoreMsgOk)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyDelayTimeLevel)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyRetryTopic)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyRealTopic)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyRealQueueId)
+       systemKeySet = append(systemKeySet, 
MessageConst.PropertyTransactionPrepared)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyProducerGroup)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyMinOffset)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyMaxOffset)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyBuyerId)
+       systemKeySet = append(systemKeySet, 
MessageConst.PropertyOriginMessageId)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyTransferFlag)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyCorrectionFlag)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyMq2Flag)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyReconsumeTime)
+       systemKeySet = append(systemKeySet, MessageConst.PropertyMsgRegion)
+       systemKeySet = append(systemKeySet, 
MessageConst.PropertyUniqClientMessageIdKeyidx)
+       systemKeySet = append(systemKeySet, 
MessageConst.PropertyMaxReconsumeTimes)
+       systemKeySet = append(systemKeySet, 
MessageConst.PropertyConsumeStartTimeStamp)
+
+       MessageConst.systemKeySet = systemKeySet
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/message/message_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message/message_queue.go 
b/rocketmq-go/model/message/message_queue.go
new file mode 100644
index 0000000..20b47be
--- /dev/null
+++ b/rocketmq-go/model/message/message_queue.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package message
+
+type MessageQueue struct {
+       topic      string
+       brokerName string
+       queueId    int32
+}
+
+func NewMessageQueue(topic string, brokerName string, queueId int32) 
*MessageQueue {
+       return &MessageQueue{
+               topic:      topic,
+               brokerName: brokerName,
+               queueId:    queueId,
+       }
+}
+
+func (queue *MessageQueue) clone() *MessageQueue {
+       no := new(MessageQueue)
+       no.topic = queue.topic
+       no.queueId = queue.queueId
+       no.brokerName = queue.brokerName
+       return no
+}
+
+func (queue MessageQueue) BrokerName() string {
+       return queue.brokerName
+}
+
+func (queue *MessageQueue) QueueID() int32 {
+       return queue.queueId
+}
+
+type MessageQueues []*MessageQueue
+
+func (queues MessageQueues) Less(i, j int) bool {
+       imq := queues[i]
+       jmq := queues[j]
+
+       if imq.topic < jmq.topic {
+               return true
+       }
+
+       if imq.topic < jmq.topic {
+               return false
+       }
+
+       if imq.brokerName < jmq.brokerName {
+               return true
+       }
+
+       if imq.brokerName < jmq.brokerName {
+               return false
+       }
+
+       if imq.queueId < jmq.queueId {
+               return true
+       }
+
+       return false
+}
+
+func (queues MessageQueues) Swap(i, j int) {
+       queues[i], queues[j] = queues[j], queues[i]
+}
+
+func (queues MessageQueues) Len() int {
+       return len(queues)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/pull_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/pull_result.go b/rocketmq-go/model/pull_result.go
new file mode 100644
index 0000000..b34a2d9
--- /dev/null
+++ b/rocketmq-go/model/pull_result.go
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package model
+
+import (
+       "fmt"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+)
+
+type PullStatus int
+
+const (
+       Found PullStatus = iota
+       NoNewMsg
+       NoMatchedMsg
+       OffsetIllegal
+)
+
+type PullResult struct {
+       pullStatus      PullStatus
+       nextBeginOffset int64
+       minOffset       int64
+       maxOffset       int64
+       msgFoundList    []*message.MessageExt
+}
+
+func NewPullResult(ps PullStatus, next, min, max int64, list 
[]*message.MessageExt) *PullResult {
+       return &PullResult{
+               ps,
+               next,
+               min,
+               max,
+               list,
+       }
+}
+
+func (result *PullResult) PullStatus() PullStatus {
+       return result.pullStatus
+}
+
+func (result *PullResult) NextBeginOffset() int64 {
+       return result.nextBeginOffset
+}
+
+func (result *PullResult) MaxOffset() int64 {
+       return result.maxOffset
+}
+
+func (result *PullResult) MinOffset() int64 {
+       return result.minOffset
+}
+
+func (result *PullResult) MsgFoundList() []*message.MessageExt {
+       return result.msgFoundList
+}
+
+func (result *PullResult) SetMsgFoundList(list []*message.MessageExt) {
+       result.msgFoundList = list
+}
+
+func (result *PullResult) String() string {
+       return fmt.Sprintf("PullResult [pullStatus=%s, nextBeginOffset=%s, 
minOffset=%s, maxOffset=%s, msgFoundList=%s]",
+               result.pullStatus, result.nextBeginOffset, result.minOffset, 
result.maxOffset, len(result.msgFoundList))
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/query_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/query_result.go 
b/rocketmq-go/model/query_result.go
new file mode 100644
index 0000000..b9e9236
--- /dev/null
+++ b/rocketmq-go/model/query_result.go
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package model
+
+import (
+       "fmt"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+)
+
+type QueryResult struct {
+       indexLastUpdateTimestamp int64
+       messageList              []*message.MessageExt
+}
+
+func NewQueryResult(timestamp int64, list []*message.MessageExt) *QueryResult {
+       return &QueryResult{
+               indexLastUpdateTimestamp: timestamp,
+               messageList:              list,
+       }
+}
+
+func (qr *QueryResult) IndexLastUpdateTimestamp() int64 {
+       return qr.indexLastUpdateTimestamp
+}
+
+func (qr *QueryResult) MessageList() []*message.MessageExt { //TODO: address?
+       return qr.messageList
+}
+
+func (qr *QueryResult) String() string {
+       return fmt.Sprintf("QueryResult [indexLastUpdateTimestamp=%s, 
messageList=%s]",
+               qr.indexLastUpdateTimestamp, qr.messageList)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/request_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/request_code.go 
b/rocketmq-go/model/request_code.go
new file mode 100644
index 0000000..495c1a7
--- /dev/null
+++ b/rocketmq-go/model/request_code.go
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http:// www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package model
+
+const (
+       // send message
+       SendMsg = 10
+       // subscribe message
+       PullMsg = 11
+       // query message
+       QueryMessage = 12
+       // queryOffset
+       QueryBrokerOffset = 13
+       // query Consumer Offset
+       QueryConsumerOffset = 14
+       // update Consumer Offset
+       UpdateConsumerOffset = 15
+       // update or increase a topic
+       UpdateAndCreateTopic = 17
+       // get all config of topic (Slave and Namesrv query the config to 
master)
+       GetAllTopicConfig = 21
+       // get all config (Slave and Namesrv query the config to master)
+       GetTopicConfigList = 22
+       // get all name list of topic
+       GetTopicNameList = 23
+       // update config
+       UpdateBrokerConfig = 25
+       // get config
+       GetBrokerConfig = 26
+       // trigger delete files
+       TriggerDeleteFILES = 27
+       // get runtime information
+       GetBrokerRuntimeInfo = 28
+       // search offset by timestamp
+       SearchOffsetByTimeStamp = 29
+       // query max offset of queue
+       GetMaxOffset = 30
+       // query min offset of queue
+       GetMinOffset = 31
+       // query earliest message store time
+       GetEarliestMsgStoreTime = 32
+       // query message by id
+       ViewMsgById = 33
+       // client send heartbeat to broker, and register self
+       HeartBeat = 34
+       // unregister client
+       UnregisterClient = 35
+       // consumer send message back to broker when can't process message
+       ConsumerSendMsgBack = 36
+       // Commit or Rollback transaction
+       EndTransaction = 37
+       // get ConsumerId list by GroupName
+       GetConsumerListByGroup = 38
+       // ckeck transaction state from producer
+       CheckTransactionState = 39
+       // broker notify consumer ids changed
+       NotifyConsumerIdsChanged = 40
+       // Consumer lock queue to master
+       LockBatchMq = 41
+       // Consumer unlock queue to master
+       UNLockBatchMq = 42
+       // get all consumer offset
+       GetAllConsumerOffset = 43
+       // get all delay offset
+       GetAllDelayOffset = 45
+       // put kv config to Namesrv
+       PutKVConfig = 100
+       // get kv config to Namesrv
+       GetKVConfig = 101
+       // delete  kv config to Namesrv
+       DeleteKVConfig = 102
+       // register a broker to Namesrv. As data is persistent,
+       // the broker will overwrite if old config existed.
+       RegisterBroker = 103
+       // register a broker
+       UnregisterBroker = 104
+       // get broker name, queue numbers by topic.
+       GetRouteinfoByTopic = 105
+       // get all registered broker to namesrv info
+       GetBrokerClusterInfo             = 106
+       UpdateAndCreateSubscriptionGroup = 200
+       GetAllSubscriptionGroupConfig    = 201
+       GetTopicStatsInfo                = 202
+       GetConsumerConnList              = 203
+       GetProducerConnList              = 204
+       WipeWritePermOfBroker            = 205
+
+       // get all topic list from namesrv
+       GetAllTopicListFromNamesrv = 206
+       // delete subscription group from broker
+       DeleteSubscriptionGroup = 207
+       // get consume stats from broker
+       GetConsumeStats = 208
+       // Suspend Consumer
+       SuspendConsumer = 209
+       // Resume Consumer
+       ResumeConsumer = 210
+       // reset Consumer Offset
+       ResetConsumerOffsetInConsumer = 211
+       // reset Consumer Offset
+       ResetConsumerOffsetInBroker = 212
+       // query which consumer groups consume the msg
+       WhoConsumeMessage = 214
+
+       // namesrv delete topic config from broker
+       DeleteTopicInBroker = 215
+       // namesrv delete topic config from namesrv
+       DeleteTopicInNamesrv = 216
+       // namesrv get server ip info by project
+       GetKvConfigByValue = 217
+       // Namesrv delete all server ip by project group
+       DeleteKvConfigByValue = 218
+       // get all KV list by namespace
+       GetKvlistByNamespace = 219
+
+       // reset offset
+       ResetConsumerClientOffset = 220
+       // get consumer status from client
+       GetConsumerStatusFromClient = 221
+       // invoke broker to reset offset
+       InvokeBrokerToResetOffset = 222
+       // invoke broker to get consumer status
+       InvokeBrokerToGetConsumerStatus = 223
+
+       // query which consumer consume msg
+       QueryTopicConsumeByWho = 300
+
+       // get topics by cluster
+       GetTopicsByCluster = 224
+
+       // register filter server to broker
+       RegisterFilterServer = 301
+       // register class to filter server
+       RegisterMsgFilterClass = 302
+       // get time span by topic and group
+       QueryConsumeTimeSpan = 303
+       // get all system topics from namesrv
+       GetSysTopicListFromNS = 304
+       // get all system topics from broker
+       GetSysTopicListFromBroker = 305
+
+       // clean expired consume queue
+       CleanExpiredConsumequeue = 306
+
+       // query consumer memory data by broker
+       GetConsumerRunningInfo = 307
+
+       // TODO: query correction offset(transfer component?)
+       QueryCorrectionOffset = 308
+
+       // Send msg to one consumer by broker, The msg will immediately consume,
+       // and return result to broker, broker return result to caller
+       ConsumeMsgDirectly = 309
+
+       // send msg with optimized network datagram
+       SendMsgV2 = 310
+
+       // get unit topic list
+       GetUnitTopicList             = 311
+       GetHasUnitSubTopicList       = 312
+       GetHasUnitSubUnunitTopicList = 313
+       CloneGroupOffset             = 314
+
+       // query all status that broker count
+       ViewBrokerStatsData = 315
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/response_code.go 
b/rocketmq-go/model/response_code.go
new file mode 100644
index 0000000..957c1e9
--- /dev/null
+++ b/rocketmq-go/model/response_code.go
@@ -0,0 +1,67 @@
+package model
+
+const (
+       // success
+       Success = 0
+       // happened unknow error exception
+       SystemError = 1
+       // system busy
+       SystemBusy = 2
+       // unsupport request code
+       RequestCodeNotSupported = 3
+       // transaction failed, because of add db failed
+       TransactionFailed = 4
+       // Broker flush disk timeout
+       FlushDiskTimeout = 10
+       // Broker slave unavailable, just for sync double write
+       SlaveNotAvailable = 11
+       // Broker write slave timeout, just for sync double write
+       FlushSlaveTimeout = 12
+       // Broker illegal message
+       MessageIllegal = 13
+       // Broker, Namesrv not available,maybe service is closing or 
incorrect permission
+       ServiceNotAvailable = 14
+       // Broker, Namesrv unsupport version
+       VersionNOtSupported = 15
+       // Broker, Namesrv no permission for operation with send/receive or 
other
+       NoPermission = 16
+       // Broker, topic not exist
+       TopicNotExist = 17
+       // Broker, topic already exist
+       TopicExistAlready = 18
+       // Broker message not found when pull
+       PullNotFound = 19
+       // Broker retry immediately, maybe msg was filtered or incorrect 
notification TODO confirm annotation
+       PullRetryImmediately = 20
+       // Broker pull offset moved, because of too big or to small TODO 
confirm annotation
+       PullOffsetMoved = 21
+       // Broker query not found
+       QueryNotFound = 22
+       // Broker parse subscription failed
+       SubscriptionParseFailed = 23
+       // Broker subscription relationship not existed
+       SubscriptionNotExist = 24
+       // Broker subscription relationship not latest
+       SubscriptionNotLatest = 25
+       // Broker subscription group not exist
+       SubscriptionGroupNotExist = 26
+       // Producer transaction should commit
+       TransactionShouldCommit = 200
+       // Producer transaction should rollback
+       TransactionShouldRollback = 201
+       // Producer transaction status unknow
+       TransactionStatusUnknow = 202
+       // Producer ProducerGroup transaction error
+       TransactionStatusGroupWrong = 203
+       // unit message,need set buyerId
+       NoBuyerID = 204
+
+       // unit message,not current unit msg
+       NotInCurrentUnit = 205
+
+       // Consumer not online
+       ConsumerNotOnline = 206
+
+       // Consumer consume msg timeout
+       ConsumeMsgTimeout = 207
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/send_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/send_result.go b/rocketmq-go/model/send_result.go
index fc18d6f..b1af4fb 100644
--- a/rocketmq-go/model/send_result.go
+++ b/rocketmq-go/model/send_result.go
@@ -16,5 +16,110 @@
  */
 package model
 
+import (
+       "fmt"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+)
+
+type SendStatus int
+
+const (
+       SendOK SendStatus = iota
+       //FlushDiskTimeout
+       //FlushSlaveTimeout
+       SlaveNotAvaliable
+)
+
 type SendResult struct {
+       sendStatus    SendStatus
+       msgID         string
+       messageQueue  *message.MessageQueue
+       queueOffset   int64
+       transactionID string
+       offsetMsgID   string
+       regionID      string
+       traceOn       bool
+}
+
+func NewSendResult(status SendStatus, msgID, offsetID string, queue 
*message.MessageQueue, queueOffset int64) *SendResult {
+       return &SendResult{
+               sendStatus:   status,
+               msgID:        msgID,
+               offsetMsgID:  offsetID,
+               messageQueue: queue,
+               queueOffset:  queueOffset,
+       }
+}
+
+func EncoderSendResultToJson(obj interface{}) string {
+       return nil // TODO
+}
+
+func DecoderSendResultFromJson(json string) *SendResult {
+       return nil // TODO
+}
+
+func (result *SendResult) TraceOn() bool {
+       return result.traceOn
+}
+
+func (result *SendResult) SetTraceOn(b bool) {
+       result.traceOn = b
+}
+
+func (result *SendResult) SetRegionID(s string) {
+       result.regionID = s
+}
+
+func (result *SendResult) MsgID() string {
+       return result.msgID
+}
+
+func (result *SendResult) SetMsgID(s string) {
+       result.msgID = s
+}
+
+func (result *SendResult) SendStatus() SendStatus {
+       return result.sendStatus
+}
+
+func (result *SendResult) SetSendStatus(status SendStatus) {
+       result.sendStatus = status
+}
+
+func (result *SendResult) MessageQueue() *message.MessageQueue {
+       return result.messageQueue
+}
+
+func (result *SendResult) SetMessageQueue(queue *message.MessageQueue) {
+       result.messageQueue = queue
+}
+
+func (result *SendResult) QueueOffset() int64 {
+       return result.queueOffset
+}
+
+func (result *SendResult) SetQueueOffset(offset int64) {
+       result.queueOffset = offset
+}
+
+func (result *SendResult) TransactionID() string {
+       return result.transactionID
+}
+
+func (result *SendResult) SetTransactionID(s string) {
+       result.transactionID = s
+}
+
+func (result *SendResult) OffsetMsgID() string {
+       return result.offsetMsgID
+}
+
+func (result *SendResult) SetOffsetMsgID(s string) {
+       result.offsetMsgID = s
+}
+
+func (result *SendResult) String() string {
+       return fmt.Sprintf("SendResult [sendStatus=%s, msgId=%s, 
offsetMsgId=%s, messageQueue=%s, queueOffset=%s]",
+               result.sendStatus, result.msgID, result.offsetMsgID, 
result.messageQueue, result.queueOffset)
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/topic_publishInfo.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publishInfo.go 
b/rocketmq-go/model/topic_publishInfo.go
new file mode 100644
index 0000000..b2f711b
--- /dev/null
+++ b/rocketmq-go/model/topic_publishInfo.go
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package model
+
+import (
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+)
+
+type TopicPublishInfo struct {
+       orderTopic         bool
+       havaTopicRouteInfo bool
+       messageQueueList   []*message.MessageQueue
+       topicRouteData     *TopicRouteData
+}
+
+func (info *TopicPublishInfo) SetOrderTopic(b bool) {
+       info.orderTopic = b
+}
+
+func (info *TopicPublishInfo) Ok() bool {
+       return false
+}
+
+func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
+       return info.messageQueueList
+}
+
+func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
+       return info.havaTopicRouteInfo
+}
+
+func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
+       info.havaTopicRouteInfo = b
+}
+
+func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
+       return info.topicRouteData
+}
+
+func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
+       info.topicRouteData = routeDate
+}
+
+func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
+       return nil //TODO
+}
+
+func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName 
string) *message.MessageQueue {
+       if brokerName == "" {
+               return info.SelectOneMessageQueue()
+       }
+       return nil //TODO
+}
+
+func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
+       return nil //TODO
+}
+
+func (info *TopicPublishInfo) String() string {
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/topic_route_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_route_data.go 
b/rocketmq-go/model/topic_route_data.go
new file mode 100644
index 0000000..f387529
--- /dev/null
+++ b/rocketmq-go/model/topic_route_data.go
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package model
+
+import (
+       "fmt"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+)
+
+type BrokerData struct {
+}
+
+type TopicRouteData struct {
+       orderTopicConf    string
+       queueDatas        []*message.MessageQueue
+       brokerDatas       []*BrokerData
+       filterServerTable map[string][]string
+}
+
+func NewTopicRouteData() *TopicRouteData {
+       return &TopicRouteData{}
+}
+
+func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData 
*TopicRouteData) {
+       clonedRouteData = &TopicRouteData{
+               route.orderTopicConf,
+               route.queueDatas,
+               route.brokerDatas,
+               route.filterServerTable,
+       }
+       // TODO: to complete
+       return
+}
+
+func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
+       return route.queueDatas
+}
+
+func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
+       route.queueDatas = data
+}
+
+func (route *TopicRouteData) BrokerDatas() []*BrokerData {
+       return route.brokerDatas
+}
+
+func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
+       route.brokerDatas = data
+}
+
+func (route *TopicRouteData) FilterServerTable() map[string][]string {
+       return route.filterServerTable
+}
+
+func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
+       route.filterServerTable = data
+}
+
+func (route *TopicRouteData) OrderTopicConf() string {
+       return route.orderTopicConf
+}
+
+func (route *TopicRouteData) SetOrderTopicConf(s string) {
+       route.orderTopicConf = s
+}
+
+func (route *TopicRouteData) HashCode() (result int) {
+       prime := 31
+       result = 1
+       result *= prime
+       // TODO
+
+       return
+}
+
+func (route *TopicRouteData) Equals(route1 interface{}) bool {
+       if route == nil {
+               return true
+       }
+       if route1 == nil {
+               return false
+       }
+       //value, ok := route1.(TopicRouteData)
+       //if !ok {
+       //      return false
+       //}
+       // TODO
+       //if route.brokerDatas == nil && value.brokerDatas != nil || 
len(route.brokerDatas) != len(value.brokerDatas) {
+       //      return false
+       //}
+       //
+       //if route.orderTopicConf == "" && value.orderTopicConf != "" || 
route.orderTopicConf != value.orderTopicConf {
+       //      return false
+       //}
+       //
+       //if route.queueDatas == nil && value.queueDatas != nil || 
route.queueDatas != value.queueDatas {
+       //      return false
+       //}
+       //
+       //if route.filterServerTable == nil && value.filterServerTable != nil ||
+       //      route.filterServerTable != value.filterServerTable {
+       //      return false
+       //}
+       return true
+}
+
+func (route *TopicRouteData) String() string {
+       return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, 
brokerDatas=%s, filterServerTable=%s]",
+               route.orderTopicConf, route.queueDatas, route.brokerDatas, 
route.filterServerTable)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/mq_client_manager.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_client_manager.go b/rocketmq-go/mq_client_manager.go
index 153d955..c07dcfd 100644
--- a/rocketmq-go/mq_client_manager.go
+++ b/rocketmq-go/mq_client_manager.go
@@ -14,9 +14,9 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package rocketmq_go
+package rocketmq
 
-import "github.com/incubator-rocketmq-externals/rocketmq-go/service"
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
 
 type MqClientManager struct {
        clientFactory          *ClientFactory
@@ -33,4 +33,4 @@ type ClientFactory struct {
 type PullMessageController struct {
        rocketMqClient service.RocketMqClient
        clientFactory  *ClientFactory
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/mq_consumer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_consumer.go b/rocketmq-go/mq_consumer.go
index f143b9a..7712ee1 100644
--- a/rocketmq-go/mq_consumer.go
+++ b/rocketmq-go/mq_consumer.go
@@ -14,22 +14,19 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package rocketmq_go
+package rocketmq
 
-import "github.com/incubator-rocketmq-externals/rocketmq-go/service"
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
 
 type RocketMQConsumer interface {
 }
 
 type MqConsumerConfig struct {
-
 }
 type DefaultMQPushConsumer struct {
        offsetStore           service.OffsetStore //for consumer's offset
        mqClient              service.RocketMqClient
-       rebalance             *service.Rebalance  //Rebalance's impl depend on 
offsetStore
+       rebalance             *service.Rebalance //Rebalance's impl depend on 
offsetStore
        consumeMessageService service.ConsumeMessageService
        ConsumerConfig        *MqConsumerConfig
 }
-
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/mq_producer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_producer.go b/rocketmq-go/mq_producer.go
index 2a50344..3677939 100644
--- a/rocketmq-go/mq_producer.go
+++ b/rocketmq-go/mq_producer.go
@@ -14,19 +14,18 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package rocketmq_go
+package rocketmq
 
-import "github.com/incubator-rocketmq-externals/rocketmq-go/service"
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
 
 type RocketMQProducer interface {
 }
 
 type MqProducerConfig struct {
-
 }
 
 type DefaultMQProducer struct {
        producerGroup   string
        ProducerConfig  *MqProducerConfig
        producerService service.ProducerService
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/communication_mode.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/communication_mode.go 
b/rocketmq-go/remoting/communication_mode.go
new file mode 100644
index 0000000..3380955
--- /dev/null
+++ b/rocketmq-go/remoting/communication_mode.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package remoting
+
+type CommunicationMode int
+
+const (
+       Sync CommunicationMode = iota
+       Async
+       OneWay
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/custom_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/custom_header.go 
b/rocketmq-go/remoting/custom_header.go
index ff1a6d4..40feade 100644
--- a/rocketmq-go/remoting/custom_header.go
+++ b/rocketmq-go/remoting/custom_header.go
@@ -15,6 +15,7 @@
  *  limitations under the License.
  */
 package remoting
+
 type CustomerHeader interface {
        FromMap(headerMap map[string]interface{})
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/event_executor.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/event_executor.go 
b/rocketmq-go/remoting/event_executor.go
new file mode 100644
index 0000000..38e1ee6
--- /dev/null
+++ b/rocketmq-go/remoting/event_executor.go
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package remoting
+
+import (
+       "fmt"
+       "github.com/golang/glog"
+       "net"
+       "sync"
+)
+
+type Runnable interface {
+       Run()
+}
+
+type NetEventType int
+
+const (
+       Connect NetEventType = iota
+       Close
+       Idle
+       Exception // TODO error?
+)
+
+type NetEvent struct {
+       eType         NetEventType
+       remoteAddress string
+       conn          net.Conn
+}
+
+func NewEventType(eType NetEventType, remoteAddr string, conn net.Conn) 
*NetEvent {
+       return &NetEvent{eType, remoteAddr, conn}
+}
+
+func (event *NetEvent) Type() NetEventType {
+       return event.eType
+}
+
+func (event *NetEvent) RemoteAddress() string {
+       return event.remoteAddress
+}
+
+func (event *NetEvent) Conn() net.Conn {
+       return event.conn
+}
+
+func (event *NetEvent) String() string {
+       return fmt.Sprintf("NettyEvent [type=%s, remoteAddr=%s, channel=%s]",
+               event.eType, event.remoteAddress, event.conn)
+}
+
+type NetEventExecutor struct {
+       hasNotified bool
+       running     bool
+       stopped     chan int
+       mu          sync.RWMutex // TODO need init?
+       client      *RemotingClient
+
+       eventQueue chan *NetEvent
+       maxSize    int
+}
+
+func NewNetEventExecutor(client *RemotingClient) *NetEventExecutor {
+       return &NetEventExecutor{
+               hasNotified: false,
+               running:     false,
+               stopped:     make(chan int),
+               client:      client,
+               eventQueue:  make(chan *NetEvent, 100), // TODO confirm size
+               maxSize:     10000,
+       }
+}
+
+func (executor *NetEventExecutor) Start() {
+       go executor.run()
+}
+
+func (executor *NetEventExecutor) Shutdown() {
+       executor.stopped <- 0
+}
+
+func (executor *NetEventExecutor) PutEvent(event *NetEvent) {
+       if len(executor.eventQueue) <= executor.maxSize {
+               executor.eventQueue <- event //append(executor.eventQueue, 
event)
+       } else {
+               fmt.Sprintf("event queue size[%s] enough, so drop this event 
%s", len(executor.eventQueue), event.String())
+       }
+}
+
+func (executor *NetEventExecutor) ServiceName() string {
+       // TODO
+       return nil
+}
+
+func (executor *NetEventExecutor) run() {
+       glog.Infof("%s service started", executor.ServiceName())
+
+       executor.mu.Lock()
+       executor.running = true
+       executor.mu.Unlock()
+
+       listener := executor.client.ConnEventListener()
+       for executor.running { // TODO optimize
+               select {
+               case event := <-executor.eventQueue:
+                       if event != nil && listener != nil {
+                               switch event.Type() {
+                               case Connect:
+                                       
listener.OnConnConnect(event.remoteAddress, event.Conn())
+                               case Close:
+                                       
listener.OnConnClose(event.remoteAddress, event.Conn())
+                               case Idle:
+                                       
listener.OnConnIdle(event.remoteAddress, event.Conn())
+                               case Exception:
+                                       
listener.OnConnException(event.remoteAddress, event.Conn())
+                               default:
+                                       break
+                               }
+                       }
+               case <-executor.stopped:
+                       executor.mu.Lock()
+                       executor.running = false
+                       executor.mu.Unlock()
+                       break
+               }
+       }
+
+       glog.Infof("%s service exit.", executor.ServiceName())
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/invoke_callback.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/invoke_callback.go 
b/rocketmq-go/remoting/invoke_callback.go
deleted file mode 100644
index 844c304..0000000
--- a/rocketmq-go/remoting/invoke_callback.go
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package remoting
-type ResponseFuture struct {
-}
-type InvokeCallback func(responseFuture *ResponseFuture)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/remoting_client.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/remoting_client.go 
b/rocketmq-go/remoting/remoting_client.go
index a4ce83a..38685cb 100644
--- a/rocketmq-go/remoting/remoting_client.go
+++ b/rocketmq-go/remoting/remoting_client.go
@@ -14,17 +14,305 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
+
 package remoting
 
-type DefalutRemotingClient struct {
+import (
+       "errors"
+       "fmt"
+       "github.com/golang/glog"
+       "math/rand"
+       "net"
+       "sync"
+       "time"
+)
+
+type ConnEventListener interface {
+       OnConnConnect(remoteAddress string, conn net.Conn)
+       OnConnClose(remoteAddress string, conn net.Conn)
+       OnConnIdle(remoteAddress string, conn net.Conn)
+       OnConnException(remoteAddress string, conn net.Conn)
+}
+
+type NetRequestProcessor struct {
+}
+
+type NetConfig struct {
+       clientWorkerNumber           int
+       clientCallbackExecutorNumber int
+       clientOneWaySemaphoreValue   int
+       clientAsyncSemaphoreValue    int
+       connectTimeoutMillis         time.Duration
+       channelNotActiveInterval     time.Duration
+
+       clientChannelMaxIdleTimeSeconds    time.Duration
+       clientSocketSndBufSize             int
+       clientSocketRcvBufSize             int
+       clientPooledByteBufAllocatorEnable bool
+       clientCloseSocketIfTimeout         bool
+}
+
+type Pair struct {
+       o1 *NetRequestProcessor
+       o2 *ExecutorService
+}
+
+type RemotingClient struct {
+       semaphoreOneWay         sync.Mutex // TODO right? use chan?
+       semaphoreAsync          sync.Mutex
+       processorTable          map[int]*Pair
+       netEventExecutor        *NetEventExecutor
+       defaultRequestProcessor *Pair
+
+       config             NetConfig
+       connTable          map[string]net.Conn
+       connTableLock      sync.RWMutex
+       timer              *time.Timer
+       namesrvAddrList    []string
+       namesrvAddrChoosed string
+
+       callBackExecutor  *ExecutorService
+       listener          ConnEventListener
+       rpcHook           RPCHook
+       responseTable     map[int32]*ResponseFuture
+       responseTableLock sync.RWMutex
+}
+
+type ConnHandlerContext struct {
+}
+
+type ExecutorService struct {
+       callBackChannel chan func()
+       quit            chan bool
+}
+
+func (exec *ExecutorService) submit(callback func()) {
+       exec.callBackChannel <- callback
+}
+
+func (exec *ExecutorService) run() {
+       go func() {
+               glog.Info("Callback Executor routing start.")
+               for {
+                       select {
+                       case invoke := <-exec.callBackChannel:
+                               invoke()
+                       case <-exec.quit:
+                               break
+                       }
+               }
+               glog.Info("Callback Executor routing quit.")
+       }()
+}
+
+func NewRemotingClient(cfg NetConfig) *RemotingClient {
+       client := &RemotingClient{
+               config:        cfg,
+               connTable:     make(map[string]net.Conn),
+               timer:         time.NewTimer(10 * time.Second),
+               responseTable: make(map[int32]*ResponseFuture),
+       }
+       // java: super(xxxx)
+       return client
+}
+
+func (rc *RemotingClient) PutNetEvent(event *NetEvent) {
+       rc.netEventExecutor.PutEvent(event)
+}
+
+func (rc *RemotingClient) executeInvokeCallback(future *ResponseFuture) {
+       executor := rc.CallbackExecutor()
+       if executor != nil {
+               executor.submit(func() {
+                       future.invokeCallback(future)
+               })
+               return
+       }
+       future.executeInvokeCallback()
+}
+
+// check timeout future
+func (rc *RemotingClient) scanResponseTable() {
+       rfMap := make(map[int]*ResponseFuture)
+       for k, future := range rc.responseTable { // TODO safety?
+               if int64(future.beginTimestamp)+int64(future.timeoutMillis)+1e9 
<= time.Now().Unix() {
+                       future.Done()
+                       delete(rc.responseTable, k)
+                       rfMap[int(k)] = future
+                       glog.Warningf("remove timeout request, ", 
future.String())
+               }
+       }
+
+       go func() {
+               for _, future := range rfMap {
+                       rc.executeInvokeCallback(future) // TODO if still 
failed, how to deal with the message ?
+               }
+       }()
+}
+
+func (rc *RemotingClient) CallbackExecutor() *ExecutorService {
+       return rc.callBackExecutor
+}
+
+func (rc *RemotingClient) RPCHook() RPCHook {
+       return rc.rpcHook
+}
+
+func (rc *RemotingClient) ConnEventListener() ConnEventListener {
+       return rc.listener
+}
+
+func (rc *RemotingClient) invokeSync(addr string, request *RemotingCommand,
+       timeout time.Duration) (*RemotingCommand, error) {
+       conn := rc.getAndCreateConn(addr)
+       if conn != nil {
+               if rc.rpcHook != nil {
+                       rc.rpcHook.DoBeforeRequest(addr, request)
+               }
+               opaque := request.opaque
+               //defer delete(rc.responseTable, opaque) TODO should in listener
+
+               future := &ResponseFuture{
+                       opaque:        opaque,
+                       timeoutMillis: timeout,
+               }
+               rc.responseTable[opaque] = future
+
+               conn.Write(request.encode()) // TODO register listener
+
+               response := future.WaitResponse(timeout)
+               if response == nil {
+                       if future.sendRequestOK {
+                               return nil, 
errors.New(fmt.Sprintf("RemotingTimeout error: %s", future.err.Error()))
+                       } else {
+                               return nil, 
errors.New(fmt.Sprintf("RemotingSend error: %s", future.err.Error()))
+                       }
+               }
+
+               if rc.rpcHook != nil {
+                       rc.rpcHook.DoBeforeResponse(addr, response)
+               }
+               return response, nil
+       } else {
+               rc.CloseConn(addr) // TODO
+               return nil, errors.New(fmt.Sprintf("Connection to %s ERROR!", 
addr))
+       }
 }
 
-func (self *DefalutRemotingClient) InvokeSync(addr string, request 
*RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err 
error) {
-       return
+func (rc *RemotingClient) invokeAsync(addr string, request *RemotingCommand,
+       timeout time.Duration, callback InvokeCallback) error {
+       conn := rc.getAndCreateConn(addr)
+       if conn != nil { // TODO how to confirm conn active?
+               if rc.rpcHook != nil {
+                       rc.rpcHook.DoBeforeRequest(addr, request)
+               }
+
+               opaque := request.opaque
+               acquired := false // TODO semaphore.tryAcquire...
+               if acquired {
+                       //final SemaphoreReleaseOnlyOnce once = new 
SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
+
+                       future := NewResponseFuture(opaque, timeout, callback)
+                       rc.responseTable[opaque] = future
+
+                       future.WaitResponse(timeout) // TODO add listener
+               }
+               return nil
+       } else {
+               rc.CloseConn(addr) // TODO
+               return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
+       }
 }
-func (self *DefalutRemotingClient)InvokeAsync(addr string, request 
*RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error {
-       return
+
+func (rc *RemotingClient) invokeOneWay(addr string, request *RemotingCommand,
+       timeout time.Duration) error {
+       conn := rc.getAndCreateConn(addr)
+       if conn != nil {
+               if rc.rpcHook != nil {
+                       rc.rpcHook.DoBeforeRequest(addr, request)
+               }
+
+               request.MarkOneWayRpc()
+               // TODO boolean acquired = 
this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+               return nil
+       } else {
+               rc.CloseConn(addr) // TODO
+               return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
+       }
 }
-func (self *DefalutRemotingClient)InvokeOneWay(addr string, request 
*RemotingCommand, timeoutMillis int64) error {
-       return
+
+func initValueIndex() int {
+       r := rand.Int()
+       if r < 0 { // math.Abs para is float64
+               r = -r
+       }
+       return r % 999 % 999
+}
+
+func (rc *RemotingClient) Start() {
+       // TODO
+}
+
+func (rc *RemotingClient) Shutdown() {
+       // TODO
+       rc.timer.Stop()
+}
+
+func (rc *RemotingClient) registerRPCHook(hk RPCHook) {
+       rc.rpcHook = hk
+}
+
+func (rc *RemotingClient) CloseConn(addr string) {
+       // TODO
+}
+
+func (rc *RemotingClient) updateNameServerAddressList(addrs []string) {
+       old, update := rc.namesrvAddrList, false
+
+       if addrs != nil && len(addrs) > 0 {
+               if old == nil || len(addrs) != len(old) {
+                       update = true
+               } else {
+                       for i := 0; i < len(addrs) && !update; i++ {
+                               if contains(old, addrs[i]) {
+                                       update = true
+                               }
+                       }
+               }
+       }
+
+       if update {
+               rc.namesrvAddrList = addrs // TODO safe?
+       }
+
+}
+
+func (rc *RemotingClient) getAndCreateConn(addr string) net.Conn {
+       return nil
+}
+
+func (rc *RemotingClient) getAndCreateNamesrvConn() net.Conn {
+       return nil
+}
+
+func (rc *RemotingClient) createConn(addr string) net.Conn {
+
+       return nil
+}
+
+func (rc *RemotingClient) RegisterProcessor(requestCode int, processor 
*NetRequestProcessor, executor ExecutorService) {
+       // TODO
+}
+
+func (rc *RemotingClient) String() string {
+       return nil // TODO
+}
+
+func contains(s []string, o string) bool { // TODO optimize
+       for _, v := range s {
+               if o == v {
+                       return true
+               }
+       }
+       return false
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/remoting_command.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/remoting_command.go 
b/rocketmq-go/remoting/remoting_command.go
index 25d9645..b57f1e9 100644
--- a/rocketmq-go/remoting/remoting_command.go
+++ b/rocketmq-go/remoting/remoting_command.go
@@ -14,9 +14,168 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
+
 package remoting
 
+// TODO: refactor
+import (
+       "bytes"
+       "encoding/binary"
+       "encoding/json"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+       "log"
+       "os"
+       "strconv"
+       "sync"
+       "sync/atomic"
+)
+
+func init() {
+       // TODO
+}
+
+const (
+       SerializeTypeProperty = "rocketmq.serialize.type"
+       SerializeTypeEnv      = "ROCKETMQ_SERIALIZE_TYPE"
+       RemotingVersionKey    = "rocketmq.remoting.version"
+       rpcType               = 0 // 0, request command
+       rpcOneWay             = 1 // 0, RPC
+)
+
+type RemotingCommandType int
+
+const (
+       ResponseCommand RemotingCommandType = iota
+       RqeusetCommand
+)
+
+var configVersion int = -1
+var requestId int32
+var decodeLock sync.Mutex
+
 type RemotingCommand struct {
+       //header
+       code      int               `json:"code"`
+       language  string            `json:"language"`
+       version   int               `json:"version"`
+       opaque    int32             `json:"opaque"`
+       flag      int               `json:"flag"`
+       remark    string            `json:"remark"`
+       extFields map[string]string `json:"extFields"`
+       header    CustomerHeader    // transient
+       //body
+       body []byte `json:"body,omitempty"`
+}
+
+func NewRemotingCommand(code int, header CustomerHeader) *RemotingCommand {
+       cmd := &RemotingCommand{
+               code:   code,
+               header: header,
+       }
+       setCmdVersion(cmd)
+       return cmd
+}
+
+func setCmdVersion(cmd *RemotingCommand) {
+       if configVersion >= 0 {
+               cmd.version = configVersion // safety
+       } else if v := os.Getenv(RemotingVersionKey); v != "" {
+               value, err := strconv.Atoi(v)
+               if err != nil {
+                       // TODO log
+               }
+               cmd.version = value
+               configVersion = value
+       }
+}
+
+func (cmd *RemotingCommand) encodeHeader() []byte {
+       length := 4
+       headerData := cmd.buildHeader()
+       length += len(headerData)
+
+       if cmd.body != nil {
+               length += len(cmd.body)
+       }
+
+       buf := bytes.NewBuffer([]byte{})
+       binary.Write(buf, binary.BigEndian, length)
+       binary.Write(buf, binary.BigEndian, len(cmd.body))
+       buf.Write(headerData)
+
+       return buf.Bytes()
+}
+
+func (cmd *RemotingCommand) buildHeader() []byte {
+       buf, err := json.Marshal(cmd)
+       if err != nil {
+               return nil
+       }
+       return buf
 }
 
+func (cmd *RemotingCommand) encode() []byte {
+       length := 4
 
+       headerData := cmd.buildHeader()
+       length += len(headerData)
+
+       if cmd.body != nil {
+               length += len(cmd.body)
+       }
+
+       buf := bytes.NewBuffer([]byte{})
+       binary.Write(buf, binary.LittleEndian, length)
+       binary.Write(buf, binary.LittleEndian, len(cmd.body))
+       buf.Write(headerData)
+
+       if cmd.body != nil {
+               buf.Write(cmd.body)
+       }
+
+       return buf.Bytes()
+}
+
+func decodeRemoteCommand(header, body []byte) *RemotingCommand {
+       decodeLock.Lock()
+       defer decodeLock.Unlock()
+
+       cmd := &RemotingCommand{}
+       cmd.extFields = make(map[string]string)
+       err := json.Unmarshal(header, cmd)
+       if err != nil {
+               log.Print(err)
+               return nil
+       }
+       cmd.body = body
+       return cmd
+}
+
+func CreateRemotingCommand(code int, requestHeader 
*header.SendMessageRequestHeader) *RemotingCommand {
+       cmd := &RemotingCommand{}
+       cmd.code = code
+       cmd.header = requestHeader
+       cmd.version = 1
+       cmd.opaque = atomic.AddInt32(&requestId, 1) // TODO: safety?
+       return cmd
+}
+
+func (cmd *RemotingCommand) SetBody(body []byte) {
+       cmd.body = body
+}
+
+func (cmd *RemotingCommand) Type() RemotingCommandType {
+       bits := 1 << rpcType
+       if (cmd.flag & bits) == bits {
+               return ResponseCommand
+       }
+       return RqeusetCommand
+}
+
+func (cmd *RemotingCommand) MarkOneWayRpc() {
+       cmd.flag |= (1 << rpcOneWay)
+}
+
+func (cmd *RemotingCommand) String() string {
+       return nil // TODO
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/response_future.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/response_future.go 
b/rocketmq-go/remoting/response_future.go
new file mode 100644
index 0000000..a8c2aec
--- /dev/null
+++ b/rocketmq-go/remoting/response_future.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+import (
+       "sync"
+       "time"
+)
+
+type InvokeCallback func(responseFuture *ResponseFuture)
+
+type ResponseFuture struct {
+       opaque          int32
+       timeoutMillis   time.Duration
+       invokeCallback  InvokeCallback
+       beginTimestamp  int64
+       responseCommand *RemotingCommand
+       sendRequestOK   bool
+       done            chan bool
+       latch           sync.WaitGroup
+       err             error
+}
+
+func NewResponseFuture(opaque int32, timeout time.Duration, callback 
InvokeCallback) *ResponseFuture {
+       future := &ResponseFuture{
+               opaque:         opaque,
+               timeoutMillis:  timeout,
+               invokeCallback: callback,
+               latch:          sync.WaitGroup{},
+       }
+       future.latch.Add(1)
+       return future
+}
+func (future *ResponseFuture) SetResponseFuture(cmd *RemotingCommand) {
+       future.responseCommand = cmd
+}
+
+func (future *ResponseFuture) Done() {
+       future.latch.Done()
+       future.done <- true
+}
+
+func (future *ResponseFuture) executeInvokeCallback() {
+       future.invokeCallback(nil) // TODO
+}
+
+func (future *ResponseFuture) WaitResponse(timeout time.Duration) 
*RemotingCommand {
+       go func() { // TODO optimize
+               time.Sleep(timeout)
+               future.latch.Add(-1) // TODO whats happened when counter less 
than 0
+       }()
+       future.latch.Wait()
+       return future.responseCommand
+}
+
+func (future *ResponseFuture) String() string {
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/rpchook.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/rpchook.go b/rocketmq-go/remoting/rpchook.go
new file mode 100644
index 0000000..150ffe0
--- /dev/null
+++ b/rocketmq-go/remoting/rpchook.go
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package remoting
+
+type RPCHook interface {
+       DoBeforeRequest(string, *RemotingCommand)
+       DoBeforeResponse(string, *RemotingCommand)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/client_api.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/client_api.go 
b/rocketmq-go/service/client_api.go
new file mode 100644
index 0000000..ff750ba
--- /dev/null
+++ b/rocketmq-go/service/client_api.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package service
+
+import (
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+)
+
+var sendSmartMsg bool = true // TODO get from system env
+
+type TopAddress struct {
+}
+
+type ClientRemotingProcessor interface {
+}
+
+func init() {
+       // TODO
+}
+
+type MQClientAPI struct {
+       remotingClient    *remoting.RemotingClient
+       topAddress        *TopAddress
+       crp               *ClientRemotingProcessor
+       nameServerAddress string
+       config            *config.ClientConfig
+}
+
+func NewMQClientAPI(cfg *config.ClientConfig, processor 
*ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI {
+       api := &MQClientAPI{
+               remotingClient: &remoting.RemotingClient{}, //TODO
+               topAddress:     &TopAddress{},              // TODO
+               crp:            processor,
+               config:         cfg,
+       }
+
+       // TODO register
+       return api
+}
+
+func (api *MQClientAPI) SendMessage(addr, brokerName string,
+       msg message.Message, requestHeader header.SendMessageRequestHeader, 
timeout int64) *model.SendResult {
+       var request *remoting.RemotingCommand
+       request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader)
+       request.SetBody(msg.Body)
+       return api.sendMessageSync(addr, brokerName, msg, timeout, request)
+}
+
+func (api *MQClientAPI) sendMessageSync(addr, brokerName string,
+       msg message.Message,
+       timeout int64,
+       request *remoting.RemotingCommand) *model.SendResult {
+       response := api.invokeSync(addr, request, timeout)
+       if response == nil {
+               panic("invokeSync panci!")
+       }
+       return nil
+       // TODO return api.processSendResponse(brokerName, msg, response)
+}
+
+func (api *MQClientAPI) invokeSync(addr string, cmd *remoting.RemotingCommand, 
timeout int64) *remoting.RemotingCommand {
+       return nil
+}
+
+func (api *MQClientAPI) processSendResponse(name string, msg message.Message, 
cmd *remoting.RemotingCommand) *remoting.RemotingCommand {
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/client_error_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/client_error_code.go 
b/rocketmq-go/service/client_error_code.go
new file mode 100644
index 0000000..d031638
--- /dev/null
+++ b/rocketmq-go/service/client_error_code.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package service
+
+const (
+       ConnecBrokerError        = 10001
+       AccessBrokerTimeoutError = 10002
+       BrokerNotExistError      = 10003
+       NoNameServerError        = 10004
+       NotFoundTopicError       = 10005
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/consume_messsage_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_messsage_service.go 
b/rocketmq-go/service/consume_messsage_service.go
index 77a5d14..d3b28fc 100644
--- a/rocketmq-go/service/consume_messsage_service.go
+++ b/rocketmq-go/service/consume_messsage_service.go
@@ -17,5 +17,4 @@
 package service
 
 type ConsumeMessageService struct {
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/mq_client.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/mq_client.go b/rocketmq-go/service/mq_client.go
index f62766e..0cf7df2 100644
--- a/rocketmq-go/service/mq_client.go
+++ b/rocketmq-go/service/mq_client.go
@@ -17,5 +17,4 @@
 package service
 
 type RocketMqClient interface {
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/producer_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/producer_service.go 
b/rocketmq-go/service/producer_service.go
index e3657a9..a684b27 100644
--- a/rocketmq-go/service/producer_service.go
+++ b/rocketmq-go/service/producer_service.go
@@ -16,11 +16,13 @@
  */
 package service
 
-import "github.com/incubator-rocketmq-externals/rocketmq-go/model/config"
+import 
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+
 type ProducerService interface {
 }
+
 type DefaultProducerService struct {
-       producerGroup   string
-       producerConfig  *config.RocketMqProducerConfig
-       mqClient        RocketMqClient
-}
\ No newline at end of file
+       producerGroup  string
+       producerConfig *config.RocketMqProducerConfig
+       mqClient       RocketMqClient
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/rebalance_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/rebalance_service.go 
b/rocketmq-go/service/rebalance_service.go
index bb97c87..acdcdd6 100644
--- a/rocketmq-go/service/rebalance_service.go
+++ b/rocketmq-go/service/rebalance_service.go
@@ -16,7 +16,7 @@
  */
 package service
 
-import "github.com/incubator-rocketmq-externals/rocketmq-go/model/config"
+import 
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
 
 type Rebalance struct {
        mqClient       RocketMqClient


Reply via email to