http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/response_code.go 
b/rocketmq-go/model/response_code.go
index 957c1e9..ed40a6d 100644
--- a/rocketmq-go/model/response_code.go
+++ b/rocketmq-go/model/response_code.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package model
 
 const (

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/send_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/send_result.go b/rocketmq-go/model/send_result.go
index b1af4fb..4d3b31f 100644
--- a/rocketmq-go/model/send_result.go
+++ b/rocketmq-go/model/send_result.go
@@ -52,7 +52,7 @@ func NewSendResult(status SendStatus, msgID, offsetID string, 
queue *message.Mes
 }
 
 func EncoderSendResultToJson(obj interface{}) string {
-       return nil // TODO
+       return "" // TODO
 }
 
 func DecoderSendResultFromJson(json string) *SendResult {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/subscription_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/subscription_data.go 
b/rocketmq-go/model/subscription_data.go
new file mode 100644
index 0000000..ce5ae74
--- /dev/null
+++ b/rocketmq-go/model/subscription_data.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+type SubscriptionData struct {
+       Topic           string
+       SubString       string
+       ClassFilterMode bool
+       TagsSet         []string
+       CodeSet         []string
+       SubVersion      int64
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_publishInfo.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publishInfo.go 
b/rocketmq-go/model/topic_publishInfo.go
index b2f711b..b5f9e37 100644
--- a/rocketmq-go/model/topic_publishInfo.go
+++ b/rocketmq-go/model/topic_publishInfo.go
@@ -18,59 +18,59 @@
 package model
 
 import (
-       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
 )
 
-type TopicPublishInfo struct {
-       orderTopic         bool
-       havaTopicRouteInfo bool
-       messageQueueList   []*message.MessageQueue
-       topicRouteData     *TopicRouteData
-}
-
-func (info *TopicPublishInfo) SetOrderTopic(b bool) {
-       info.orderTopic = b
-}
-
-func (info *TopicPublishInfo) Ok() bool {
-       return false
-}
-
-func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
-       return info.messageQueueList
-}
-
-func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
-       return info.havaTopicRouteInfo
-}
-
-func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
-       info.havaTopicRouteInfo = b
-}
-
-func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
-       return info.topicRouteData
-}
-
-func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
-       info.topicRouteData = routeDate
-}
-
-func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
-       return nil //TODO
-}
-
-func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName 
string) *message.MessageQueue {
-       if brokerName == "" {
-               return info.SelectOneMessageQueue()
-       }
-       return nil //TODO
-}
-
-func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
-       return nil //TODO
-}
-
-func (info *TopicPublishInfo) String() string {
-       return nil
-}
+//type TopicPublishInfo struct {
+//     orderTopic         bool
+//     havaTopicRouteInfo bool
+//     messageQueueList   []*message.MessageQueue
+//     topicRouteData     *TopicRouteData
+//}
+//
+//func (info *TopicPublishInfo) SetOrderTopic(b bool) {
+//     info.orderTopic = b
+//}
+//
+//func (info *TopicPublishInfo) Ok() bool {
+//     return false
+//}
+//
+//func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
+//     return info.messageQueueList
+//}
+//
+//func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
+//     return info.havaTopicRouteInfo
+//}
+//
+//func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
+//     info.havaTopicRouteInfo = b
+//}
+//
+//func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
+//     return info.topicRouteData
+//}
+//
+//func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
+//     info.topicRouteData = routeDate
+//}
+//
+//func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
+//     return nil //TODO
+//}
+//
+//func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName 
string) *message.MessageQueue {
+//     if brokerName == "" {
+//             return info.SelectOneMessageQueue()
+//     }
+//     return nil //TODO
+//}
+//
+//func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
+//     return 0 //TODO
+//}
+//
+//func (info *TopicPublishInfo) String() string {
+//     return ""
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_publish_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publish_info.go 
b/rocketmq-go/model/topic_publish_info.go
new file mode 100644
index 0000000..14ec088
--- /dev/null
+++ b/rocketmq-go/model/topic_publish_info.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+import (
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+       "sync/atomic"
+)
+
+type TopicPublishInfo struct {
+       OrderTopic             bool
+       HaveTopicRouterInfo    bool
+       MessageQueueList       []MessageQueue
+       TopicRouteDataInstance *TopicRouteData
+       topicQueueIndex        int32
+}
+
+//private boolean orderTopic = false;
+//private boolean haveTopicRouterInfo = false;
+//private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
+//private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); 
// todo
+//private TopicRouteData topicRouteData;
+
+func (self *TopicPublishInfo) JudgeTopicPublishInfoOk() (bIsTopicOk bool) {
+       bIsTopicOk = (len(self.MessageQueueList) > 0)
+       return
+}
+func (self *TopicPublishInfo) FetchQueueIndex() (index int) {
+       qLen := len(self.MessageQueueList)
+       if qLen > 0 {
+               qIndex := atomic.AddInt32(&self.topicQueueIndex, 1)
+               qIndex = qIndex % int32(qLen)
+               index = int(qIndex)
+       }
+       return
+}
+func BuildTopicSubscribeInfoFromRoteData(topic string, topicRouteData 
*TopicRouteData) (mqList []*MessageQueue) {
+       mqList = make([]*MessageQueue, 0)
+       for _, queueData := range topicRouteData.QueueDatas {
+               if !constant.ReadAble(queueData.Perm) {
+                       continue
+               }
+               var i int32
+               for i = 0; i < queueData.ReadQueueNums; i++ {
+                       mq := &MessageQueue{
+                               Topic:      topic,
+                               BrokerName: queueData.BrokerName,
+                               QueueId:    i,
+                       }
+                       mqList = append(mqList, mq)
+               }
+       }
+       return
+}
+
+func BuildTopicPublishInfoFromTopicRoteData(topic string, topicRouteData 
*TopicRouteData) (topicPublishInfo *TopicPublishInfo) {
+       // all order topic is false  todo change
+       topicPublishInfo = &TopicPublishInfo{
+               TopicRouteDataInstance: topicRouteData,
+               OrderTopic:             false,
+               MessageQueueList:       []MessageQueue{}}
+       for _, queueData := range topicRouteData.QueueDatas {
+               if !constant.WriteAble(queueData.Perm) {
+                       continue
+               }
+               for _, brokerData := range topicRouteData.BrokerDatas {
+                       if brokerData.BrokerName == queueData.BrokerName {
+                               if len(brokerData.BrokerAddrs["0"]) == 0 {
+                                       break
+                               }
+                               var i int32
+                               for i = 0; i < queueData.WriteQueueNums; i++ {
+                                       messageQueue := MessageQueue{Topic: 
topic, BrokerName: queueData.BrokerName, QueueId: i}
+                                       topicPublishInfo.MessageQueueList = 
append(topicPublishInfo.MessageQueueList, messageQueue)
+                                       topicPublishInfo.HaveTopicRouterInfo = 
true
+                               }
+                               break
+                       }
+               }
+       }
+       return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_route_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_route_data.go 
b/rocketmq-go/model/topic_route_data.go
index f387529..348479f 100644
--- a/rocketmq-go/model/topic_route_data.go
+++ b/rocketmq-go/model/topic_route_data.go
@@ -18,108 +18,128 @@
 package model
 
 import (
-       "fmt"
-       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+       //"fmt"
+       
//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+       "sync"
 )
 
-type BrokerData struct {
-}
+//
+//type BrokerData struct {
+//}
+//
+//type TopicRouteData struct {
+//     orderTopicConf    string
+//     queueDatas        []*message.MessageQueue
+//     brokerDatas       []*BrokerData
+//     filterServerTable map[string][]string
+//}
+//
+//func NewTopicRouteData() *TopicRouteData {
+//     return &TopicRouteData{}
+//}
+//
+//func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData 
*TopicRouteData) {
+//     clonedRouteData = &TopicRouteData{
+//             route.orderTopicConf,
+//             route.queueDatas,
+//             route.brokerDatas,
+//             route.filterServerTable,
+//     }
+//     // TODO: to complete
+//     return
+//}
+//
+//func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
+//     return route.queueDatas
+//}
+//
+//func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
+//     route.queueDatas = data
+//}
+//
+//func (route *TopicRouteData) BrokerDatas() []*BrokerData {
+//     return route.brokerDatas
+//}
+//
+//func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
+//     route.brokerDatas = data
+//}
+//
+//func (route *TopicRouteData) FilterServerTable() map[string][]string {
+//     return route.filterServerTable
+//}
+//
+//func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
+//     route.filterServerTable = data
+//}
+//
+//func (route *TopicRouteData) OrderTopicConf() string {
+//     return route.orderTopicConf
+//}
+//
+//func (route *TopicRouteData) SetOrderTopicConf(s string) {
+//     route.orderTopicConf = s
+//}
+//
+//func (route *TopicRouteData) HashCode() (result int) {
+//     prime := 31
+//     result = 1
+//     result *= prime
+//     // TODO
+//
+//     return
+//}
+//
+//func (route *TopicRouteData) Equals(route1 interface{}) bool {
+//     if route == nil {
+//             return true
+//     }
+//     if route1 == nil {
+//             return false
+//     }
+//     //value, ok := route1.(TopicRouteData)
+//     //if !ok {
+//     //      return false
+//     //}
+//     // TODO
+//     //if route.brokerDatas == nil && value.brokerDatas != nil || 
len(route.brokerDatas) != len(value.brokerDatas) {
+//     //      return false
+//     //}
+//     //
+//     //if route.orderTopicConf == "" && value.orderTopicConf != "" || 
route.orderTopicConf != value.orderTopicConf {
+//     //      return false
+//     //}
+//     //
+//     //if route.queueDatas == nil && value.queueDatas != nil || 
route.queueDatas != value.queueDatas {
+//     //      return false
+//     //}
+//     //
+//     //if route.filterServerTable == nil && value.filterServerTable != nil ||
+//     //      route.filterServerTable != value.filterServerTable {
+//     //      return false
+//     //}
+//     return true
+//}
+//
+//func (route *TopicRouteData) String() string {
+//     return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, 
brokerDatas=%s, filterServerTable=%s]",
+//             route.orderTopicConf, route.queueDatas, route.brokerDatas, 
route.filterServerTable)
+//}
 
 type TopicRouteData struct {
-       orderTopicConf    string
-       queueDatas        []*message.MessageQueue
-       brokerDatas       []*BrokerData
-       filterServerTable map[string][]string
-}
-
-func NewTopicRouteData() *TopicRouteData {
-       return &TopicRouteData{}
-}
-
-func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData 
*TopicRouteData) {
-       clonedRouteData = &TopicRouteData{
-               route.orderTopicConf,
-               route.queueDatas,
-               route.brokerDatas,
-               route.filterServerTable,
-       }
-       // TODO: to complete
-       return
-}
-
-func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
-       return route.queueDatas
-}
-
-func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
-       route.queueDatas = data
-}
-
-func (route *TopicRouteData) BrokerDatas() []*BrokerData {
-       return route.brokerDatas
-}
-
-func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
-       route.brokerDatas = data
+       OrderTopicConf string
+       QueueDatas     []*QueueData
+       BrokerDatas    []*BrokerData
 }
-
-func (route *TopicRouteData) FilterServerTable() map[string][]string {
-       return route.filterServerTable
-}
-
-func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
-       route.filterServerTable = data
-}
-
-func (route *TopicRouteData) OrderTopicConf() string {
-       return route.orderTopicConf
-}
-
-func (route *TopicRouteData) SetOrderTopicConf(s string) {
-       route.orderTopicConf = s
+type QueueData struct {
+       BrokerName     string
+       ReadQueueNums  int32
+       WriteQueueNums int32
+       Perm           int32
+       TopicSynFlag   int32
 }
-
-func (route *TopicRouteData) HashCode() (result int) {
-       prime := 31
-       result = 1
-       result *= prime
-       // TODO
-
-       return
-}
-
-func (route *TopicRouteData) Equals(route1 interface{}) bool {
-       if route == nil {
-               return true
-       }
-       if route1 == nil {
-               return false
-       }
-       //value, ok := route1.(TopicRouteData)
-       //if !ok {
-       //      return false
-       //}
-       // TODO
-       //if route.brokerDatas == nil && value.brokerDatas != nil || 
len(route.brokerDatas) != len(value.brokerDatas) {
-       //      return false
-       //}
-       //
-       //if route.orderTopicConf == "" && value.orderTopicConf != "" || 
route.orderTopicConf != value.orderTopicConf {
-       //      return false
-       //}
-       //
-       //if route.queueDatas == nil && value.queueDatas != nil || 
route.queueDatas != value.queueDatas {
-       //      return false
-       //}
-       //
-       //if route.filterServerTable == nil && value.filterServerTable != nil ||
-       //      route.filterServerTable != value.filterServerTable {
-       //      return false
-       //}
-       return true
-}
-
-func (route *TopicRouteData) String() string {
-       return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, 
brokerDatas=%s, filterServerTable=%s]",
-               route.orderTopicConf, route.queueDatas, route.brokerDatas, 
route.filterServerTable)
+type BrokerData struct {
+       BrokerName      string
+       BrokerAddrs     map[string]string
+       BrokerAddrsLock sync.RWMutex
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_client_manager.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_client_manager.go b/rocketmq-go/mq_client_manager.go
index c07dcfd..731158f 100644
--- a/rocketmq-go/mq_client_manager.go
+++ b/rocketmq-go/mq_client_manager.go
@@ -16,13 +16,67 @@
  */
 package rocketmq
 
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+import (
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+       "sync"
+       "time"
+)
 
 type MqClientManager struct {
        clientFactory          *ClientFactory
        rocketMqClient         service.RocketMqClient
        pullMessageController  *PullMessageController
        defaultProducerService RocketMQProducer //for send back message
+
+       rocketMqManagerLock sync.Mutex
+       //ClientId            string
+       BootTimestamp int64
+
+       NamesrvLock   sync.Mutex
+       HeartBeatLock sync.Mutex
+       //rebalanceControllr       *RebalanceController
+}
+
+type MqClientConfig struct {
+}
+
+func NewMqClientManager(clientConfig *MqClientConfig) (rocketMqManager 
*MqClientManager) {
+       rocketMqManager = &MqClientManager{}
+       rocketMqManager.BootTimestamp = time.Now().Unix()
+       rocketMqManager.clientFactory = clientFactoryInit()
+       //rocketMqManager.rocketMqClient =
+       //rocketMqManager.pullMessageController = 
NewPullMessageController(rocketMqManager.mqClient, 
rocketMqManager.clientFactory)
+       //rocketMqManager.cleanExpireMsgController = 
NewCleanExpireMsgController(rocketMqManager.mqClient, 
rocketMqManager.clientFactory)
+       //rocketMqManager.rebalanceControllr = 
NewRebalanceController(rocketMqManager.clientFactory)
+
+       return
+}
+
+func (self *MqClientManager) RegisterProducer(producer *DefaultMQProducer) {
+       return
+}
+
+func (self *MqClientManager) RegisterConsumer(consumer RocketMQConsumer) {
+       // todo check config
+       //if (self.defaultProducerService == nil) {
+       //      self.defaultProducerService = 
service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, 
mq_config.NewProducerConfig(), self.mqClient)
+       //}
+       return
+}
+
+func (self *MqClientManager) Start() {
+       //self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat 
first
+       self.startAllScheduledTask()
+}
+func (manager *MqClientManager) startAllScheduledTask() {
+
+}
+
+func clientFactoryInit() (clientFactory *ClientFactory) {
+       clientFactory = &ClientFactory{}
+       clientFactory.ProducerTable = make(map[string]RocketMQProducer)
+       clientFactory.ConsumerTable = make(map[string]RocketMQConsumer)
+       return
 }
 
 type ClientFactory struct {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_consumer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_consumer.go b/rocketmq-go/mq_consumer.go
index 7712ee1..7112537 100644
--- a/rocketmq-go/mq_consumer.go
+++ b/rocketmq-go/mq_consumer.go
@@ -16,7 +16,10 @@
  */
 package rocketmq
 
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+import (
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+)
 
 type RocketMQConsumer interface {
 }
@@ -28,5 +31,44 @@ type DefaultMQPushConsumer struct {
        mqClient              service.RocketMqClient
        rebalance             *service.Rebalance //Rebalance's impl depend on 
offsetStore
        consumeMessageService service.ConsumeMessageService
-       ConsumerConfig        *MqConsumerConfig
+       consumerConfig        *MqConsumerConfig
+
+       consumerGroup string
+       //consumeFromWhere      string
+       consumeType  string
+       messageModel string
+       unitMode     bool
+
+       subscription    map[string]string   //topic|subExpression
+       subscriptionTag map[string][]string // we use it filter again
+       // 分配策略
+       pause bool //when reset offset we need pause
+}
+
+func NewDefaultMQPushConsumer(consumerGroup string, mqConsumerConfig 
*MqConsumerConfig) (defaultMQPushConsumer *DefaultMQPushConsumer) {
+       defaultMQPushConsumer = &DefaultMQPushConsumer{}
+       defaultMQPushConsumer.consumerConfig = mqConsumerConfig
+       return
+}
+
+func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener 
model.MessageListener) {
+       self.consumeMessageService = 
service.NewConsumeMessageConcurrentlyServiceImpl(messageListener)
+}
+func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression 
string) {
+       //self.subscription[topic] = subExpression
+       //if len(subExpression) == 0 || subExpression == "*" {
+       //      return
+       //}
+       //tags := strings.Split(subExpression, "||")
+       //tagsList := []string{}
+       //for _, tag := range tags {
+       //      t := strings.TrimSpace(tag)
+       //      if len(t) == 0 {
+       //              continue
+       //      }
+       //      tagsList = append(tagsList, t)
+       //}
+       //if len(tagsList) > 0 {
+       //      self.subscriptionTag[topic] = tagsList
+       //}
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_producer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_producer.go b/rocketmq-go/mq_producer.go
index 3677939..d1a011b 100644
--- a/rocketmq-go/mq_producer.go
+++ b/rocketmq-go/mq_producer.go
@@ -25,7 +25,7 @@ type MqProducerConfig struct {
 }
 
 type DefaultMQProducer struct {
-       producerGroup   string
-       ProducerConfig  *MqProducerConfig
-       producerService service.ProducerService
+       producerGroup    string
+       mqProducerConfig *MqProducerConfig
+       producerService  service.ProducerService
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/custom_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/custom_header.go 
b/rocketmq-go/remoting/custom_header.go
index 40feade..04a46be 100644
--- a/rocketmq-go/remoting/custom_header.go
+++ b/rocketmq-go/remoting/custom_header.go
@@ -18,4 +18,5 @@ package remoting
 
 type CustomerHeader interface {
        FromMap(headerMap map[string]interface{})
+       //ToMap()(headerMap map[string]interface{})
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/event_executor.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/event_executor.go 
b/rocketmq-go/remoting/event_executor.go
deleted file mode 100644
index 38e1ee6..0000000
--- a/rocketmq-go/remoting/event_executor.go
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package remoting
-
-import (
-       "fmt"
-       "github.com/golang/glog"
-       "net"
-       "sync"
-)
-
-type Runnable interface {
-       Run()
-}
-
-type NetEventType int
-
-const (
-       Connect NetEventType = iota
-       Close
-       Idle
-       Exception // TODO error?
-)
-
-type NetEvent struct {
-       eType         NetEventType
-       remoteAddress string
-       conn          net.Conn
-}
-
-func NewEventType(eType NetEventType, remoteAddr string, conn net.Conn) 
*NetEvent {
-       return &NetEvent{eType, remoteAddr, conn}
-}
-
-func (event *NetEvent) Type() NetEventType {
-       return event.eType
-}
-
-func (event *NetEvent) RemoteAddress() string {
-       return event.remoteAddress
-}
-
-func (event *NetEvent) Conn() net.Conn {
-       return event.conn
-}
-
-func (event *NetEvent) String() string {
-       return fmt.Sprintf("NettyEvent [type=%s, remoteAddr=%s, channel=%s]",
-               event.eType, event.remoteAddress, event.conn)
-}
-
-type NetEventExecutor struct {
-       hasNotified bool
-       running     bool
-       stopped     chan int
-       mu          sync.RWMutex // TODO need init?
-       client      *RemotingClient
-
-       eventQueue chan *NetEvent
-       maxSize    int
-}
-
-func NewNetEventExecutor(client *RemotingClient) *NetEventExecutor {
-       return &NetEventExecutor{
-               hasNotified: false,
-               running:     false,
-               stopped:     make(chan int),
-               client:      client,
-               eventQueue:  make(chan *NetEvent, 100), // TODO confirm size
-               maxSize:     10000,
-       }
-}
-
-func (executor *NetEventExecutor) Start() {
-       go executor.run()
-}
-
-func (executor *NetEventExecutor) Shutdown() {
-       executor.stopped <- 0
-}
-
-func (executor *NetEventExecutor) PutEvent(event *NetEvent) {
-       if len(executor.eventQueue) <= executor.maxSize {
-               executor.eventQueue <- event //append(executor.eventQueue, 
event)
-       } else {
-               fmt.Sprintf("event queue size[%s] enough, so drop this event 
%s", len(executor.eventQueue), event.String())
-       }
-}
-
-func (executor *NetEventExecutor) ServiceName() string {
-       // TODO
-       return nil
-}
-
-func (executor *NetEventExecutor) run() {
-       glog.Infof("%s service started", executor.ServiceName())
-
-       executor.mu.Lock()
-       executor.running = true
-       executor.mu.Unlock()
-
-       listener := executor.client.ConnEventListener()
-       for executor.running { // TODO optimize
-               select {
-               case event := <-executor.eventQueue:
-                       if event != nil && listener != nil {
-                               switch event.Type() {
-                               case Connect:
-                                       
listener.OnConnConnect(event.remoteAddress, event.Conn())
-                               case Close:
-                                       
listener.OnConnClose(event.remoteAddress, event.Conn())
-                               case Idle:
-                                       
listener.OnConnIdle(event.remoteAddress, event.Conn())
-                               case Exception:
-                                       
listener.OnConnException(event.remoteAddress, event.Conn())
-                               default:
-                                       break
-                               }
-                       }
-               case <-executor.stopped:
-                       executor.mu.Lock()
-                       executor.running = false
-                       executor.mu.Unlock()
-                       break
-               }
-       }
-
-       glog.Infof("%s service exit.", executor.ServiceName())
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/json_serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/json_serializable.go 
b/rocketmq-go/remoting/json_serializable.go
new file mode 100644
index 0000000..c2c5ea0
--- /dev/null
+++ b/rocketmq-go/remoting/json_serializable.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+import (
+       "encoding/json"
+)
+
+type JsonSerializer struct {
+}
+
+func (self *JsonSerializer) EncodeHeaderData(command *RemotingCommand) []byte {
+       buf, err := json.Marshal(command)
+       if err != nil {
+               return nil
+       }
+       return buf
+}
+func (self *JsonSerializer) DecodeRemoteCommand(header, body []byte) 
*RemotingCommand {
+       cmd := &RemotingCommand{}
+       cmd.ExtFields = make(map[string]interface{})
+       err := json.Unmarshal(header, cmd)
+       if err != nil {
+               return nil
+       }
+       cmd.Body = body
+       return cmd
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/remoting_client.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/remoting_client.go 
b/rocketmq-go/remoting/remoting_client.go
index 38685cb..206fdcf 100644
--- a/rocketmq-go/remoting/remoting_client.go
+++ b/rocketmq-go/remoting/remoting_client.go
@@ -14,305 +14,357 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-
 package remoting
 
 import (
+       "bytes"
+       "encoding/binary"
        "errors"
-       "fmt"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
        "github.com/golang/glog"
        "math/rand"
        "net"
+       "strconv"
+       "strings"
        "sync"
        "time"
 )
 
-type ConnEventListener interface {
-       OnConnConnect(remoteAddress string, conn net.Conn)
-       OnConnClose(remoteAddress string, conn net.Conn)
-       OnConnIdle(remoteAddress string, conn net.Conn)
-       OnConnException(remoteAddress string, conn net.Conn)
+type RemotingClient interface {
+       InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) 
(remotingCommand *RemotingCommand, err error)
+       InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, 
invokeCallback InvokeCallback) error
+       InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis 
int64) error
 }
-
-type NetRequestProcessor struct {
+type DefalutRemotingClient struct {
+       clientId     string
+       clientConfig *config.ClientConfig
+
+       connTable     map[string]net.Conn
+       connTableLock sync.RWMutex
+
+       responseTable  util.ConcurrentMap //map[int32]*ResponseFuture
+       processorTable util.ConcurrentMap //map[int]ClientRequestProcessor 
//requestCode|ClientRequestProcessor
+       //      protected final HashMap<Integer/* request code */, 
Pair<NettyRequestProcessor, ExecutorService>> processorTable =
+       //new HashMap<Integer, Pair<NettyRequestProcessor, 
ExecutorService>>(64);
+       namesrvAddrList          []string
+       namesrvAddrSelectedAddr  string
+       namesrvAddrSelectedIndex int                    //how to chose. done
+       namesvrLockRW            sync.RWMutex           //
+       clientRequestProcessor   ClientRequestProcessor //mange register the 
processor here
+       serializerHandler        SerializerHandler      //rocketmq encode decode
 }
 
-type NetConfig struct {
-       clientWorkerNumber           int
-       clientCallbackExecutorNumber int
-       clientOneWaySemaphoreValue   int
-       clientAsyncSemaphoreValue    int
-       connectTimeoutMillis         time.Duration
-       channelNotActiveInterval     time.Duration
-
-       clientChannelMaxIdleTimeSeconds    time.Duration
-       clientSocketSndBufSize             int
-       clientSocketRcvBufSize             int
-       clientPooledByteBufAllocatorEnable bool
-       clientCloseSocketIfTimeout         bool
+func RemotingClientInit(clientConfig *config.ClientConfig, 
clientRequestProcessor ClientRequestProcessor) (client *DefalutRemotingClient) {
+       client = &DefalutRemotingClient{}
+       client.connTable = map[string]net.Conn{}
+       client.responseTable = util.New()
+       client.clientConfig = clientConfig
+
+       client.namesrvAddrList = 
strings.Split(clientConfig.NameServerAddress(), ";")
+       client.namesrvAddrSelectedIndex = -1
+       client.clientRequestProcessor = clientRequestProcessor
+       client.serializerHandler = NewSerializerHandler()
+       return
 }
 
-type Pair struct {
-       o1 *NetRequestProcessor
-       o2 *ExecutorService
+func (self *DefalutRemotingClient) InvokeSync(addr string, request 
*RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err 
error) {
+       var conn net.Conn
+       conn, err = self.GetOrCreateConn(addr)
+       response := &ResponseFuture{
+               SendRequestOK:  false,
+               Opaque:         request.Opaque,
+               TimeoutMillis:  timeoutMillis,
+               BeginTimestamp: time.Now().Unix(),
+               Done:           make(chan bool),
+       }
+       header := self.serializerHandler.EncodeHeader(request)
+       body := request.Body
+       self.SetResponse(request.Opaque, response)
+       err = self.sendRequest(header, body, conn, addr)
+       if err != nil {
+               glog.Error(err)
+               return
+       }
+       select {
+       case <-response.Done:
+               remotingCommand = response.ResponseCommand
+               return
+       case <-time.After(time.Duration(timeoutMillis) * time.Millisecond):
+               err = errors.New("invoke sync timeout:" + 
strconv.FormatInt(timeoutMillis, 10) + " Millisecond")
+               return
+       }
 }
-
-type RemotingClient struct {
-       semaphoreOneWay         sync.Mutex // TODO right? use chan?
-       semaphoreAsync          sync.Mutex
-       processorTable          map[int]*Pair
-       netEventExecutor        *NetEventExecutor
-       defaultRequestProcessor *Pair
-
-       config             NetConfig
-       connTable          map[string]net.Conn
-       connTableLock      sync.RWMutex
-       timer              *time.Timer
-       namesrvAddrList    []string
-       namesrvAddrChoosed string
-
-       callBackExecutor  *ExecutorService
-       listener          ConnEventListener
-       rpcHook           RPCHook
-       responseTable     map[int32]*ResponseFuture
-       responseTableLock sync.RWMutex
+func (self *DefalutRemotingClient) InvokeAsync(addr string, request 
*RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error {
+       conn, err := self.GetOrCreateConn(addr)
+       if err != nil {
+               return err
+       }
+       response := &ResponseFuture{
+               SendRequestOK:  false,
+               Opaque:         request.Opaque,
+               TimeoutMillis:  timeoutMillis,
+               BeginTimestamp: time.Now().Unix(),
+               InvokeCallback: invokeCallback,
+       }
+       self.SetResponse(request.Opaque, response)
+       header := self.serializerHandler.EncodeHeader(request)
+       body := request.Body
+       err = self.sendRequest(header, body, conn, addr)
+       if err != nil {
+               glog.Error(err)
+               return err
+       }
+       return err
 }
-
-type ConnHandlerContext struct {
+func (self *DefalutRemotingClient) InvokeOneWay(addr string, request 
*RemotingCommand, timeoutMillis int64) error {
+       conn, err := self.GetOrCreateConn(addr)
+       if err != nil {
+               return err
+       }
+       header := self.serializerHandler.EncodeHeader(request)
+       body := request.Body
+       err = self.sendRequest(header, body, conn, addr)
+       if err != nil {
+               glog.Error(err)
+               return err
+       }
+       return err
 }
 
-type ExecutorService struct {
-       callBackChannel chan func()
-       quit            chan bool
+func (self *DefalutRemotingClient) sendRequest(header, body []byte, conn 
net.Conn, addr string) error {
+       var requestBytes []byte
+       requestBytes = append(requestBytes, header...)
+       if body != nil && len(body) > 0 {
+               requestBytes = append(requestBytes, body...)
+       }
+       _, err := conn.Write(requestBytes)
+       if err != nil {
+               glog.Error(err)
+               if len(addr) > 0 {
+                       self.ReleaseConn(addr, conn)
+               }
+               return err
+       }
+       return nil
 }
-
-func (exec *ExecutorService) submit(callback func()) {
-       exec.callBackChannel <- callback
+func (self *DefalutRemotingClient) GetNamesrvAddrList() []string {
+       return self.namesrvAddrList
 }
 
-func (exec *ExecutorService) run() {
-       go func() {
-               glog.Info("Callback Executor routing start.")
-               for {
-                       select {
-                       case invoke := <-exec.callBackChannel:
-                               invoke()
-                       case <-exec.quit:
-                               break
-                       }
-               }
-               glog.Info("Callback Executor routing quit.")
-       }()
+func (self *DefalutRemotingClient) SetResponse(index int32, response 
*ResponseFuture) {
+       self.responseTable.Set(strconv.Itoa(int(index)), response)
 }
-
-func NewRemotingClient(cfg NetConfig) *RemotingClient {
-       client := &RemotingClient{
-               config:        cfg,
-               connTable:     make(map[string]net.Conn),
-               timer:         time.NewTimer(10 * time.Second),
-               responseTable: make(map[int32]*ResponseFuture),
+func (self *DefalutRemotingClient) getResponse(index int32) (response 
*ResponseFuture, err error) {
+       obj, ok := self.responseTable.Get(strconv.Itoa(int(index)))
+       if !ok {
+               err = errors.New("get conn from responseTable error")
+               return
        }
-       // java: super(xxxx)
-       return client
+       response = obj.(*ResponseFuture)
+       return
 }
-
-func (rc *RemotingClient) PutNetEvent(event *NetEvent) {
-       rc.netEventExecutor.PutEvent(event)
+func (self *DefalutRemotingClient) removeResponse(index int32) {
+       self.responseTable.Remove(strconv.Itoa(int(index)))
 }
-
-func (rc *RemotingClient) executeInvokeCallback(future *ResponseFuture) {
-       executor := rc.CallbackExecutor()
-       if executor != nil {
-               executor.submit(func() {
-                       future.invokeCallback(future)
-               })
+func (self *DefalutRemotingClient) GetOrCreateConn(address string) (conn 
net.Conn, err error) {
+       if len(address) == 0 {
+               conn, err = self.getNamesvrConn()
                return
        }
-       future.executeInvokeCallback()
-}
-
-// check timeout future
-func (rc *RemotingClient) scanResponseTable() {
-       rfMap := make(map[int]*ResponseFuture)
-       for k, future := range rc.responseTable { // TODO safety?
-               if int64(future.beginTimestamp)+int64(future.timeoutMillis)+1e9 
<= time.Now().Unix() {
-                       future.Done()
-                       delete(rc.responseTable, k)
-                       rfMap[int(k)] = future
-                       glog.Warningf("remove timeout request, ", 
future.String())
-               }
+       conn = self.GetConn(address)
+       if conn != nil {
+               return
        }
-
-       go func() {
-               for _, future := range rfMap {
-                       rc.executeInvokeCallback(future) // TODO if still 
failed, how to deal with the message ?
-               }
-       }()
+       conn, err = self.CreateConn(address)
+       return
 }
-
-func (rc *RemotingClient) CallbackExecutor() *ExecutorService {
-       return rc.callBackExecutor
-}
-
-func (rc *RemotingClient) RPCHook() RPCHook {
-       return rc.rpcHook
+func (self *DefalutRemotingClient) GetConn(address string) (conn net.Conn) {
+       self.connTableLock.RLock()
+       conn = self.connTable[address]
+       self.connTableLock.RUnlock()
+       return
 }
-
-func (rc *RemotingClient) ConnEventListener() ConnEventListener {
-       return rc.listener
-}
-
-func (rc *RemotingClient) invokeSync(addr string, request *RemotingCommand,
-       timeout time.Duration) (*RemotingCommand, error) {
-       conn := rc.getAndCreateConn(addr)
+func (self *DefalutRemotingClient) CreateConn(address string) (conn net.Conn, 
err error) {
+       defer self.connTableLock.Unlock()
+       self.connTableLock.Lock()
+       conn = self.connTable[address]
        if conn != nil {
-               if rc.rpcHook != nil {
-                       rc.rpcHook.DoBeforeRequest(addr, request)
-               }
-               opaque := request.opaque
-               //defer delete(rc.responseTable, opaque) TODO should in listener
-
-               future := &ResponseFuture{
-                       opaque:        opaque,
-                       timeoutMillis: timeout,
-               }
-               rc.responseTable[opaque] = future
-
-               conn.Write(request.encode()) // TODO register listener
-
-               response := future.WaitResponse(timeout)
-               if response == nil {
-                       if future.sendRequestOK {
-                               return nil, 
errors.New(fmt.Sprintf("RemotingTimeout error: %s", future.err.Error()))
-                       } else {
-                               return nil, 
errors.New(fmt.Sprintf("RemotingSend error: %s", future.err.Error()))
-                       }
-               }
-
-               if rc.rpcHook != nil {
-                       rc.rpcHook.DoBeforeResponse(addr, response)
-               }
-               return response, nil
-       } else {
-               rc.CloseConn(addr) // TODO
-               return nil, errors.New(fmt.Sprintf("Connection to %s ERROR!", 
addr))
+               return
        }
+       conn, err = self.createAndHandleTcpConn(address)
+       self.connTable[address] = conn
+       return
 }
 
-func (rc *RemotingClient) invokeAsync(addr string, request *RemotingCommand,
-       timeout time.Duration, callback InvokeCallback) error {
-       conn := rc.getAndCreateConn(addr)
-       if conn != nil { // TODO how to confirm conn active?
-               if rc.rpcHook != nil {
-                       rc.rpcHook.DoBeforeRequest(addr, request)
+func (self *DefalutRemotingClient) getNamesvrConn() (conn net.Conn, err error) 
{
+       self.namesvrLockRW.RLock()
+       address := self.namesrvAddrSelectedAddr
+       self.namesvrLockRW.RUnlock()
+       if len(address) != 0 {
+               conn = self.GetConn(address)
+               if conn != nil {
+                       return
                }
+       }
 
-               opaque := request.opaque
-               acquired := false // TODO semaphore.tryAcquire...
-               if acquired {
-                       //final SemaphoreReleaseOnlyOnce once = new 
SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
-
-                       future := NewResponseFuture(opaque, timeout, callback)
-                       rc.responseTable[opaque] = future
-
-                       future.WaitResponse(timeout) // TODO add listener
+       defer self.namesvrLockRW.Unlock()
+       self.namesvrLockRW.Lock()
+       //already connected by another write lock owner
+       address = self.namesrvAddrSelectedAddr
+       if len(address) != 0 {
+               conn = self.GetConn(address)
+               if conn != nil {
+                       return
                }
-               return nil
-       } else {
-               rc.CloseConn(addr) // TODO
-               return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
        }
-}
 
-func (rc *RemotingClient) invokeOneWay(addr string, request *RemotingCommand,
-       timeout time.Duration) error {
-       conn := rc.getAndCreateConn(addr)
-       if conn != nil {
-               if rc.rpcHook != nil {
-                       rc.rpcHook.DoBeforeRequest(addr, request)
+       addressCount := len(self.namesrvAddrList)
+       if self.namesrvAddrSelectedIndex < 0 {
+               self.namesrvAddrSelectedIndex = rand.Intn(addressCount)
+       }
+       for i := 1; i <= addressCount; i++ {
+               selectedIndex := (self.namesrvAddrSelectedIndex + i) % 
addressCount
+               selectAddress := self.namesrvAddrList[selectedIndex]
+               if len(selectAddress) == 0 {
+                       continue
+               }
+               conn, err = self.CreateConn(selectAddress)
+               if err == nil {
+                       self.namesrvAddrSelectedAddr = selectAddress
+                       self.namesrvAddrSelectedIndex = selectedIndex
+                       return
                }
-
-               request.MarkOneWayRpc()
-               // TODO boolean acquired = 
this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
-               return nil
-       } else {
-               rc.CloseConn(addr) // TODO
-               return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
        }
+       err = errors.New("all namesvrAddress can't use!,address:" + 
self.clientConfig.NameServerAddress())
+       return
 }
-
-func initValueIndex() int {
-       r := rand.Int()
-       if r < 0 { // math.Abs para is float64
-               r = -r
+func (self *DefalutRemotingClient) createAndHandleTcpConn(address string) 
(conn net.Conn, err error) {
+       conn, err = net.Dial("tcp", address)
+       if err != nil {
+               glog.Error(err)
+               return nil, err
        }
-       return r % 999 % 999
+       go self.handlerReceiveLoop(conn, address) //handler连接 
处理这个连接返回的结果
+       return
 }
-
-func (rc *RemotingClient) Start() {
-       // TODO
-}
-
-func (rc *RemotingClient) Shutdown() {
-       // TODO
-       rc.timer.Stop()
-}
-
-func (rc *RemotingClient) registerRPCHook(hk RPCHook) {
-       rc.rpcHook = hk
-}
-
-func (rc *RemotingClient) CloseConn(addr string) {
-       // TODO
+func (self *DefalutRemotingClient) ReleaseConn(addr string, conn net.Conn) {
+       defer self.connTableLock.Unlock()
+       conn.Close()
+       self.connTableLock.Lock()
+       delete(self.connTable, addr)
 }
 
-func (rc *RemotingClient) updateNameServerAddressList(addrs []string) {
-       old, update := rc.namesrvAddrList, false
-
-       if addrs != nil && len(addrs) > 0 {
-               if old == nil || len(addrs) != len(old) {
-                       update = true
-               } else {
-                       for i := 0; i < len(addrs) && !update; i++ {
-                               if contains(old, addrs[i]) {
-                                       update = true
+func (self *DefalutRemotingClient) handlerReceiveLoop(conn net.Conn, addr 
string) (err error) {
+       defer func() {
+               //when for is break releaseConn
+               glog.Error(err, addr)
+               self.ReleaseConn(addr, conn)
+       }()
+       b := make([]byte, 1024)
+       var length, headerLength, bodyLength int32
+       var buf = bytes.NewBuffer([]byte{})
+       var header, body []byte
+       var readTotalLengthFlag = true //readLen when true,read data when false
+       for {
+               var n int
+               n, err = conn.Read(b)
+               if err != nil {
+                       return
+               }
+               _, err = buf.Write(b[:n])
+               if err != nil {
+                       return
+               }
+               for {
+                       if readTotalLengthFlag {
+                               //we read 4 bytes of allDataLength
+                               if buf.Len() >= 4 {
+                                       err = binary.Read(buf, 
binary.BigEndian, &length)
+                                       if err != nil {
+                                               return
+                                       }
+                                       readTotalLengthFlag = false //now turn 
to read data
+                               } else {
+                                       break //wait bytes we not got
+                               }
+                       }
+                       if !readTotalLengthFlag {
+                               if buf.Len() < int(length) {
+                                       // judge all data received.if not,loop 
to wait
+                                       break
                                }
                        }
+                       //now all data received, we can read totalLen again
+                       readTotalLengthFlag = true
+
+                       //get the data,and handler it
+                       //header len
+                       err = binary.Read(buf, binary.BigEndian, &headerLength)
+                       var realHeaderLen = (headerLength & 0x00ffffff)
+                       //headerData the first ff is about serializable type
+                       var headerSerializableType = byte(headerLength >> 24)
+                       header = make([]byte, realHeaderLen)
+                       _, err = buf.Read(header)
+                       bodyLength = length - 4 - realHeaderLen
+                       body = make([]byte, int(bodyLength))
+                       if bodyLength == 0 {
+                               // no body
+                       } else {
+                               _, err = buf.Read(body)
+                       }
+                       go self.handlerReceivedMessage(conn, 
headerSerializableType, header, body)
                }
        }
-
-       if update {
-               rc.namesrvAddrList = addrs // TODO safe?
-       }
-
 }
-
-func (rc *RemotingClient) getAndCreateConn(addr string) net.Conn {
-       return nil
-}
-
-func (rc *RemotingClient) getAndCreateNamesrvConn() net.Conn {
-       return nil
-}
-
-func (rc *RemotingClient) createConn(addr string) net.Conn {
-
-       return nil
+func (self *DefalutRemotingClient) handlerReceivedMessage(conn net.Conn, 
headerSerializableType byte, headBytes []byte, bodyBytes []byte) {
+       cmd := 
self.serializerHandler.DecodeRemoteCommand(headerSerializableType, headBytes, 
bodyBytes)
+       if cmd.IsResponseType() {
+               self.handlerResponse(cmd)
+               return
+       }
+       go self.handlerRequest(conn, cmd)
 }
-
-func (rc *RemotingClient) RegisterProcessor(requestCode int, processor 
*NetRequestProcessor, executor ExecutorService) {
-       // TODO
+func (self *DefalutRemotingClient) handlerRequest(conn net.Conn, cmd 
*RemotingCommand) {
+       responseCommand := self.clientRequestProcessor(cmd)
+       if responseCommand == nil {
+               return
+       }
+       responseCommand.Opaque = cmd.Opaque
+       responseCommand.MarkResponseType()
+       header := self.serializerHandler.EncodeHeader(responseCommand)
+       body := responseCommand.Body
+       err := self.sendRequest(header, body, conn, "")
+       if err != nil {
+               glog.Error(err)
+       }
 }
+func (self *DefalutRemotingClient) handlerResponse(cmd *RemotingCommand) {
+       response, err := self.getResponse(cmd.Opaque)
+       self.removeResponse(cmd.Opaque)
+       if err != nil {
+               return
+       }
+       response.ResponseCommand = cmd
+       if response.InvokeCallback != nil {
+               response.InvokeCallback(response)
+       }
 
-func (rc *RemotingClient) String() string {
-       return nil // TODO
+       if response.Done != nil {
+               response.Done <- true
+       }
 }
 
-func contains(s []string, o string) bool { // TODO optimize
-       for _, v := range s {
-               if o == v {
-                       return true
+func (self *DefalutRemotingClient) ClearExpireResponse() {
+       for seq, responseObj := range self.responseTable.Items() {
+               response := responseObj.(*ResponseFuture)
+               if (response.BeginTimestamp + 30) <= time.Now().Unix() {
+                       //30 mins expire
+                       self.responseTable.Remove(seq)
+                       if response.InvokeCallback != nil {
+                               response.InvokeCallback(nil)
+                               glog.Warningf("remove time out request %v", 
response)
+                       }
                }
        }
-       return false
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/remoting_command.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/remoting_command.go 
b/rocketmq-go/remoting/remoting_command.go
index b57f1e9..a8361bd 100644
--- a/rocketmq-go/remoting/remoting_command.go
+++ b/rocketmq-go/remoting/remoting_command.go
@@ -14,168 +14,56 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-
 package remoting
 
-// TODO: refactor
 import (
-       "bytes"
-       "encoding/binary"
-       "encoding/json"
-       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
-       "log"
-       "os"
-       "strconv"
-       "sync"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util/structs"
        "sync/atomic"
 )
 
-func init() {
-       // TODO
-}
-
-const (
-       SerializeTypeProperty = "rocketmq.serialize.type"
-       SerializeTypeEnv      = "ROCKETMQ_SERIALIZE_TYPE"
-       RemotingVersionKey    = "rocketmq.remoting.version"
-       rpcType               = 0 // 0, request command
-       rpcOneWay             = 1 // 0, RPC
-)
-
-type RemotingCommandType int
+var opaque int32
 
-const (
-       ResponseCommand RemotingCommandType = iota
-       RqeusetCommand
-)
+var RPC_TYPE int = 0   // 0, REQUEST_COMMAND
+var RPC_ONEWAY int = 1 // 0, RPC
 
-var configVersion int = -1
-var requestId int32
-var decodeLock sync.Mutex
+//var RESPONSE_TYPE int= 1 << RPC_TYPE
+var RESPONSE_TYPE int = 1
 
 type RemotingCommand struct {
        //header
-       code      int               `json:"code"`
-       language  string            `json:"language"`
-       version   int               `json:"version"`
-       opaque    int32             `json:"opaque"`
-       flag      int               `json:"flag"`
-       remark    string            `json:"remark"`
-       extFields map[string]string `json:"extFields"`
-       header    CustomerHeader    // transient
-       //body
-       body []byte `json:"body,omitempty"`
+       Code      int16                  `json:"code"`
+       Language  string                 `json:"language"` //int 8
+       Version   int16                  `json:"version"`
+       Opaque    int32                  `json:"opaque"`
+       Flag      int                    `json:"flag"`
+       Remark    string                 `json:"remark"`
+       ExtFields map[string]interface{} `json:"extFields"` //java's ExtFields 
and customHeader is use this key word
+       Body      []byte                 `json:"body,omitempty"`
 }
 
-func NewRemotingCommand(code int, header CustomerHeader) *RemotingCommand {
-       cmd := &RemotingCommand{
-               code:   code,
-               header: header,
-       }
-       setCmdVersion(cmd)
-       return cmd
+func NewRemotingCommand(commandCode int16, customerHeader CustomerHeader) 
*RemotingCommand {
+       return NewRemotingCommandWithBody(commandCode, customerHeader, nil)
 }
 
-func setCmdVersion(cmd *RemotingCommand) {
-       if configVersion >= 0 {
-               cmd.version = configVersion // safety
-       } else if v := os.Getenv(RemotingVersionKey); v != "" {
-               value, err := strconv.Atoi(v)
-               if err != nil {
-                       // TODO log
-               }
-               cmd.version = value
-               configVersion = value
+func NewRemotingCommandWithBody(commandCode int16, customerHeader 
CustomerHeader, body []byte) *RemotingCommand {
+       remotingCommand := new(RemotingCommand)
+       remotingCommand.Code = commandCode
+       currOpaque := atomic.AddInt32(&opaque, 1)
+       remotingCommand.Opaque = currOpaque
+       remotingCommand.Flag = constant.REMOTING_COMMAND_FLAG
+       remotingCommand.Language = constant.REMOTING_COMMAND_LANGUAGE
+       remotingCommand.Version = constant.REMOTING_COMMAND_VERSION
+       if customerHeader != nil {
+               remotingCommand.ExtFields = structs.Map(customerHeader)
        }
+       remotingCommand.Body = body
+       return remotingCommand
 }
 
-func (cmd *RemotingCommand) encodeHeader() []byte {
-       length := 4
-       headerData := cmd.buildHeader()
-       length += len(headerData)
-
-       if cmd.body != nil {
-               length += len(cmd.body)
-       }
-
-       buf := bytes.NewBuffer([]byte{})
-       binary.Write(buf, binary.BigEndian, length)
-       binary.Write(buf, binary.BigEndian, len(cmd.body))
-       buf.Write(headerData)
-
-       return buf.Bytes()
+func (self *RemotingCommand) IsResponseType() bool {
+       return self.Flag&(RESPONSE_TYPE) == RESPONSE_TYPE
 }
-
-func (cmd *RemotingCommand) buildHeader() []byte {
-       buf, err := json.Marshal(cmd)
-       if err != nil {
-               return nil
-       }
-       return buf
-}
-
-func (cmd *RemotingCommand) encode() []byte {
-       length := 4
-
-       headerData := cmd.buildHeader()
-       length += len(headerData)
-
-       if cmd.body != nil {
-               length += len(cmd.body)
-       }
-
-       buf := bytes.NewBuffer([]byte{})
-       binary.Write(buf, binary.LittleEndian, length)
-       binary.Write(buf, binary.LittleEndian, len(cmd.body))
-       buf.Write(headerData)
-
-       if cmd.body != nil {
-               buf.Write(cmd.body)
-       }
-
-       return buf.Bytes()
-}
-
-func decodeRemoteCommand(header, body []byte) *RemotingCommand {
-       decodeLock.Lock()
-       defer decodeLock.Unlock()
-
-       cmd := &RemotingCommand{}
-       cmd.extFields = make(map[string]string)
-       err := json.Unmarshal(header, cmd)
-       if err != nil {
-               log.Print(err)
-               return nil
-       }
-       cmd.body = body
-       return cmd
-}
-
-func CreateRemotingCommand(code int, requestHeader 
*header.SendMessageRequestHeader) *RemotingCommand {
-       cmd := &RemotingCommand{}
-       cmd.code = code
-       cmd.header = requestHeader
-       cmd.version = 1
-       cmd.opaque = atomic.AddInt32(&requestId, 1) // TODO: safety?
-       return cmd
-}
-
-func (cmd *RemotingCommand) SetBody(body []byte) {
-       cmd.body = body
-}
-
-func (cmd *RemotingCommand) Type() RemotingCommandType {
-       bits := 1 << rpcType
-       if (cmd.flag & bits) == bits {
-               return ResponseCommand
-       }
-       return RqeusetCommand
-}
-
-func (cmd *RemotingCommand) MarkOneWayRpc() {
-       cmd.flag |= (1 << rpcOneWay)
-}
-
-func (cmd *RemotingCommand) String() string {
-       return nil // TODO
+func (self *RemotingCommand) MarkResponseType() {
+       self.Flag = (self.Flag | RESPONSE_TYPE)
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/request_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/request_code.go 
b/rocketmq-go/remoting/request_code.go
new file mode 100644
index 0000000..52965d5
--- /dev/null
+++ b/rocketmq-go/remoting/request_code.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+const (
+       SEND_MESSAGE                        = 10
+       PULL_MESSAGE                        = 11
+       QUERY_MESSAGE                       = 12
+       QUERY_BROKER_OFFSET                 = 13
+       QUERY_CONSUMER_OFFSET               = 14
+       UPDATE_CONSUMER_OFFSET              = 15
+       UPDATE_AND_CREATE_TOPIC             = 17
+       GET_ALL_TOPIC_CONFIG                = 21
+       GET_TOPIC_CONFIG_LIST               = 22
+       GET_TOPIC_NAME_LIST                 = 23
+       UPDATE_BROKER_CONFIG                = 25
+       GET_BROKER_CONFIG                   = 26
+       TRIGGER_DELETE_FILES                = 27
+       GET_BROKER_RUNTIME_INFO             = 28
+       SEARCH_OFFSET_BY_TIMESTAMP          = 29
+       GET_MAX_OFFSET                      = 30
+       GET_MIN_OFFSET                      = 31
+       GET_EARLIEST_MSG_STORETIME          = 32
+       VIEW_MESSAGE_BY_ID                  = 33
+       HEART_BEAT                          = 34
+       UNREGISTER_CLIENT                   = 35
+       CONSUMER_SEND_MSG_BACK              = 36
+       END_TRANSACTION                     = 37
+       GET_CONSUMER_LIST_BY_GROUP          = 38
+       CHECK_TRANSACTION_STATE             = 39
+       NOTIFY_CONSUMER_IDS_CHANGED         = 40
+       LOCK_BATCH_MQ                       = 41
+       UNLOCK_BATCH_MQ                     = 42
+       GET_ALL_CONSUMER_OFFSET             = 43
+       GET_ALL_DELAY_OFFSET                = 45
+       PUT_KV_CONFIG                       = 100
+       GET_KV_CONFIG                       = 101
+       DELETE_KV_CONFIG                    = 102
+       REGISTER_BROKER                     = 103
+       UNREGISTER_BROKER                   = 104
+       GET_ROUTEINTO_BY_TOPIC              = 105
+       GET_BROKER_CLUSTER_INFO             = 106
+       UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
+       GET_ALL_SUBSCRIPTIONGROUP_CONFIG    = 201
+       GET_TOPIC_STATS_INFO                = 202
+       GET_CONSUMER_CONNECTION_LIST        = 203
+       GET_PRODUCER_CONNECTION_LIST        = 204
+       WIPE_WRITE_PERM_OF_BROKER           = 205
+
+       GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
+       DELETE_SUBSCRIPTIONGROUP           = 207
+       GET_CONSUME_STATS                  = 208
+       SUSPEND_CONSUMER                   = 209
+       RESUME_CONSUMER                    = 210
+       RESET_CONSUMER_OFFSET_IN_CONSUMER  = 211
+       RESET_CONSUMER_OFFSET_IN_BROKER    = 212
+       ADJUST_CONSUMER_THREAD_POOL        = 213
+       WHO_CONSUME_THE_MESSAGE            = 214
+
+       DELETE_TOPIC_IN_BROKER    = 215
+       DELETE_TOPIC_IN_NAMESRV   = 216
+       GET_KV_CONFIG_BY_VALUE    = 217
+       DELETE_KV_CONFIG_BY_VALUE = 218
+       GET_KVLIST_BY_NAMESPACE   = 219
+
+       RESET_CONSUMER_CLIENT_OFFSET         = 220
+       GET_CONSUMER_STATUS_FROM_CLIENT      = 221
+       INVOKE_BROKER_TO_RESET_OFFSET        = 222
+       INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223
+
+       QUERY_TOPIC_CONSUME_BY_WHO = 300
+
+       GET_TOPICS_BY_CLUSTER = 224
+
+       REGISTER_FILTER_SERVER            = 301
+       REGISTER_MESSAGE_FILTER_CLASS     = 302
+       QUERY_CONSUME_TIME_SPAN           = 303
+       GET_SYSTEM_TOPIC_LIST_FROM_NS     = 304
+       GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305
+
+       CLEAN_EXPIRED_CONSUMEQUEUE = 306
+
+       GET_CONSUMER_RUNNING_INFO = 307
+
+       QUERY_CORRECTION_OFFSET = 308
+
+       CONSUME_MESSAGE_DIRECTLY = 309
+
+       SEND_MESSAGE_V2 = 310
+
+       GET_UNIT_TOPIC_LIST                = 311
+       GET_HAS_UNIT_SUB_TOPIC_LIST        = 312
+       GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
+       CLONE_GROUP_OFFSET                 = 314
+
+       VIEW_BROKER_STATS_DATA = 315
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/request_processor.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/request_processor.go 
b/rocketmq-go/remoting/request_processor.go
new file mode 100644
index 0000000..eec8cd8
--- /dev/null
+++ b/rocketmq-go/remoting/request_processor.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+type ClientRequestProcessor func(remotingCommand *RemotingCommand) 
(responseCommand *RemotingCommand)
+
+//CHECK_TRANSACTION_STATE
+//NOTIFY_CONSUMER_IDS_CHANGED
+//RESET_CONSUMER_CLIENT_OFFSET
+//GET_CONSUMER_STATUS_FROM_CLIENT
+//GET_CONSUMER_RUNNING_INFO
+//CONSUME_MESSAGE_DIRECTLY

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/response_code.go 
b/rocketmq-go/remoting/response_code.go
new file mode 100644
index 0000000..6a49c77
--- /dev/null
+++ b/rocketmq-go/remoting/response_code.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+const (
+       SUCCESS                       = 0
+       SYSTEM_ERROR                  = 1
+       SYSTEM_BUSY                   = 2
+       REQUEST_CODE_NOT_SUPPORTED    = 3
+       TRANSACTION_FAILED            = 4
+       FLUSH_DISK_TIMEOUT            = 10
+       SLAVE_NOT_AVAILABLE           = 11
+       FLUSH_SLAVE_TIMEOUT           = 12
+       MESSAGE_ILLEGAL               = 13
+       SERVICE_NOT_AVAILABLE         = 14
+       VERSION_NOT_SUPPORTED         = 15
+       NO_PERMISSION                 = 16
+       TOPIC_NOT_EXIST               = 17
+       TOPIC_EXIST_ALREADY           = 18
+       PULL_NOT_FOUND                = 19
+       PULL_RETRY_IMMEDIATELY        = 20
+       PULL_OFFSET_MOVED             = 21
+       QUERY_NOT_FOUND               = 22
+       SUBSCRIPTION_PARSE_FAILED     = 23
+       SUBSCRIPTION_NOT_EXIST        = 24
+       SUBSCRIPTION_NOT_LATEST       = 25
+       SUBSCRIPTION_GROUP_NOT_EXIST  = 26
+       TRANSACTION_SHOULD_COMMIT     = 200
+       TRANSACTION_SHOULD_ROLLBACK   = 201
+       TRANSACTION_STATE_UNKNOW      = 202
+       TRANSACTION_STATE_GROUP_WRONG = 203
+       NO_BUYER_ID                   = 204
+
+       NOT_IN_CURRENT_UNIT = 205
+
+       CONSUMER_NOT_ONLINE = 206
+
+       CONSUME_MSG_TIMEOUT = 207
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/response_future.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/response_future.go 
b/rocketmq-go/remoting/response_future.go
index a8c2aec..1bece6d 100644
--- a/rocketmq-go/remoting/response_future.go
+++ b/rocketmq-go/remoting/response_future.go
@@ -16,57 +16,14 @@
  */
 package remoting
 
-import (
-       "sync"
-       "time"
-)
-
-type InvokeCallback func(responseFuture *ResponseFuture)
-
 type ResponseFuture struct {
-       opaque          int32
-       timeoutMillis   time.Duration
-       invokeCallback  InvokeCallback
-       beginTimestamp  int64
-       responseCommand *RemotingCommand
-       sendRequestOK   bool
-       done            chan bool
-       latch           sync.WaitGroup
-       err             error
-}
-
-func NewResponseFuture(opaque int32, timeout time.Duration, callback 
InvokeCallback) *ResponseFuture {
-       future := &ResponseFuture{
-               opaque:         opaque,
-               timeoutMillis:  timeout,
-               invokeCallback: callback,
-               latch:          sync.WaitGroup{},
-       }
-       future.latch.Add(1)
-       return future
-}
-func (future *ResponseFuture) SetResponseFuture(cmd *RemotingCommand) {
-       future.responseCommand = cmd
-}
-
-func (future *ResponseFuture) Done() {
-       future.latch.Done()
-       future.done <- true
-}
-
-func (future *ResponseFuture) executeInvokeCallback() {
-       future.invokeCallback(nil) // TODO
-}
-
-func (future *ResponseFuture) WaitResponse(timeout time.Duration) 
*RemotingCommand {
-       go func() { // TODO optimize
-               time.Sleep(timeout)
-               future.latch.Add(-1) // TODO whats happened when counter less 
than 0
-       }()
-       future.latch.Wait()
-       return future.responseCommand
-}
-
-func (future *ResponseFuture) String() string {
-       return nil
+       ResponseCommand *RemotingCommand
+       SendRequestOK   bool
+       Rrr             error
+       Opaque          int32
+       TimeoutMillis   int64
+       InvokeCallback  InvokeCallback
+       BeginTimestamp  int64
+       Done            chan bool
 }
+type InvokeCallback func(responseFuture *ResponseFuture)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/rocketmq_serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/rocketmq_serializable.go 
b/rocketmq-go/remoting/rocketmq_serializable.go
new file mode 100644
index 0000000..22dc41f
--- /dev/null
+++ b/rocketmq-go/remoting/rocketmq_serializable.go
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+)
+
+type RocketMqSerializer struct {
+}
+
+func (self *RocketMqSerializer) EncodeHeaderData(cmd *RemotingCommand) []byte {
+       var (
+               remarkBytes       []byte
+               remarkBytesLen    int
+               extFieldsBytes    []byte
+               extFieldsBytesLen int
+       )
+       remarkBytesLen = 0
+       if len(cmd.Remark) > 0 {
+               remarkBytes = []byte(cmd.Remark)
+               remarkBytesLen = len(remarkBytes)
+       }
+       if cmd.ExtFields != nil {
+               extFieldsBytes = rocketMqCustomHeaderSerialize(cmd.ExtFields)
+               extFieldsBytesLen = len(extFieldsBytes)
+       }
+       buf := bytes.NewBuffer([]byte{})
+       binary.Write(buf, binary.BigEndian, int16(cmd.Code))       
//code(~32767) 2
+       binary.Write(buf, binary.BigEndian, int8(0))               //JAVA
+       binary.Write(buf, binary.BigEndian, int16(cmd.Version))    //2
+       binary.Write(buf, binary.BigEndian, int32(cmd.Opaque))     //opaque 4
+       binary.Write(buf, binary.BigEndian, int32(cmd.Flag))       //4
+       binary.Write(buf, binary.BigEndian, int32(remarkBytesLen)) //4
+       if remarkBytesLen > 0 {
+               buf.Write(remarkBytes)
+       }
+       binary.Write(buf, binary.BigEndian, int32(extFieldsBytesLen)) //4
+       if extFieldsBytesLen > 0 {
+               buf.Write(extFieldsBytes)
+       }
+       fmt.Println(buf.Bytes())
+       return buf.Bytes()
+}
+
+func (self *RocketMqSerializer) DecodeRemoteCommand(headerArray, body []byte) 
(cmd *RemotingCommand) {
+       cmd = &RemotingCommand{}
+       buf := bytes.NewBuffer(headerArray)
+       // int code(~32767)
+       binary.Read(buf, binary.BigEndian, &cmd.Code)
+       // LanguageCode language
+       var LanguageCodeNope byte
+       binary.Read(buf, binary.BigEndian, &LanguageCodeNope)
+       cmd.Language = constant.REMOTING_COMMAND_LANGUAGE //todo use code from 
remote
+       // int version(~32767)
+       binary.Read(buf, binary.BigEndian, &cmd.Version)
+       // int opaque
+       binary.Read(buf, binary.BigEndian, &cmd.Opaque)
+       // int flag
+       binary.Read(buf, binary.BigEndian, &cmd.Flag)
+       // String remark
+       var remarkLen, extFieldsLen int32
+       binary.Read(buf, binary.BigEndian, &remarkLen)
+       if remarkLen > 0 {
+               var remarkData = make([]byte, remarkLen)
+               binary.Read(buf, binary.BigEndian, &remarkData)
+               cmd.Remark = string(remarkData)
+       }
+       //map ext
+       // HashMap<String, String> extFields
+       binary.Read(buf, binary.BigEndian, &extFieldsLen)
+       if extFieldsLen > 0 {
+               var extFieldsData = make([]byte, extFieldsLen)
+               binary.Read(buf, binary.BigEndian, &extFieldsData)
+               extFiledMap := customHeaderDeserialize(extFieldsData)
+               cmd.ExtFields = extFiledMap
+       }
+       cmd.Body = body
+       return
+}
+
+func rocketMqCustomHeaderSerialize(extFiled map[string]interface{}) (byteData 
[]byte) {
+       buf := bytes.NewBuffer([]byte{})
+       for key, value := range extFiled {
+               keyBytes := []byte(fmt.Sprintf("%v", key))
+               valueBytes := []byte(fmt.Sprintf("%v", value))
+               binary.Write(buf, binary.BigEndian, int16(len(keyBytes)))
+               buf.Write(keyBytes)
+               binary.Write(buf, binary.BigEndian, int32(len(valueBytes)))
+               buf.Write(valueBytes)
+       }
+       byteData = buf.Bytes()
+       return
+}
+
+func customHeaderDeserialize(extFiledDataBytes []byte) (extFiledMap 
map[string]interface{}) {
+       extFiledMap = make(map[string]interface{})
+       buf := bytes.NewBuffer(extFiledDataBytes)
+       for buf.Len() > 0 {
+               var key = getItemFormExtFiledDataBytes(buf, "key")
+               var value = getItemFormExtFiledDataBytes(buf, "value")
+               extFiledMap[key] = value
+       }
+       return
+}
+func getItemFormExtFiledDataBytes(buff *bytes.Buffer, itemType string) (item 
string) {
+       if itemType == "key" {
+               var len int16
+               binary.Read(buff, binary.BigEndian, &len)
+               var data = make([]byte, len)
+               binary.Read(buff, binary.BigEndian, &data)
+               item = string(data)
+       }
+       if itemType == "value" {
+               var len int32
+               binary.Read(buff, binary.BigEndian, &len)
+               var data = make([]byte, len)
+               binary.Read(buff, binary.BigEndian, &data)
+               item = string(data)
+       }
+       return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/serializable.go 
b/rocketmq-go/remoting/serializable.go
new file mode 100644
index 0000000..24420cd
--- /dev/null
+++ b/rocketmq-go/remoting/serializable.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+import (
+       "bytes"
+       "encoding/binary"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+       "github.com/golang/glog"
+)
+
+type SerializerHandler struct {
+       serializer Serializer //which serializer this client use, depend on  
constant.USE_HEADER_SERIALIZETYPE
+}
+
+type Serializer interface {
+       EncodeHeaderData(request *RemotingCommand) []byte
+       DecodeRemoteCommand(header, body []byte) *RemotingCommand
+}
+
+var JSON_SERIALIZER = &JsonSerializer{}
+var ROCKETMQ_SERIALIZER = &RocketMqSerializer{}
+
+func NewSerializerHandler() SerializerHandler {
+       serializerHandler := SerializerHandler{}
+       switch constant.USE_HEADER_SERIALIZETYPE {
+       case constant.JSON_SERIALIZE:
+               serializerHandler.serializer = JSON_SERIALIZER
+               break
+
+       case constant.ROCKETMQ_SERIALIZE:
+               serializerHandler.serializer = ROCKETMQ_SERIALIZER
+               break
+       default:
+               panic("illeage serializer type")
+       }
+       return serializerHandler
+}
+func (self *SerializerHandler) EncodeHeader(request *RemotingCommand) []byte {
+       length := 4
+       headerData := self.serializer.EncodeHeaderData(request)
+       length += len(headerData)
+       if request.Body != nil {
+               length += len(request.Body)
+       }
+       buf := bytes.NewBuffer([]byte{})
+       binary.Write(buf, binary.BigEndian, int32(length))                      
                                 // len
+       binary.Write(buf, binary.BigEndian, 
int32(len(headerData)|(int(constant.USE_HEADER_SERIALIZETYPE)<<24))) // header 
len
+       buf.Write(headerData)
+       return buf.Bytes()
+}
+
+func (self *SerializerHandler) DecodeRemoteCommand(headerSerializableType 
byte, header, body []byte) *RemotingCommand {
+       var serializer Serializer
+       switch headerSerializableType {
+       case constant.JSON_SERIALIZE:
+               serializer = JSON_SERIALIZER
+               break
+       case constant.ROCKETMQ_SERIALIZE:
+               serializer = ROCKETMQ_SERIALIZER
+               break
+       default:
+               glog.Error("Unknow headerSerializableType", 
headerSerializableType)
+       }
+       return serializer.DecodeRemoteCommand(header, body)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/client_api.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/client_api.go 
b/rocketmq-go/service/client_api.go
index ff750ba..6901f4c 100644
--- a/rocketmq-go/service/client_api.go
+++ b/rocketmq-go/service/client_api.go
@@ -20,7 +20,6 @@ package service
 import (
        "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
        
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
-       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
        
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
        "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
 )
@@ -45,25 +44,25 @@ type MQClientAPI struct {
        config            *config.ClientConfig
 }
 
-func NewMQClientAPI(cfg *config.ClientConfig, processor 
*ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI {
-       api := &MQClientAPI{
-               remotingClient: &remoting.RemotingClient{}, //TODO
-               topAddress:     &TopAddress{},              // TODO
-               crp:            processor,
-               config:         cfg,
-       }
-
-       // TODO register
-       return api
-}
-
-func (api *MQClientAPI) SendMessage(addr, brokerName string,
-       msg message.Message, requestHeader header.SendMessageRequestHeader, 
timeout int64) *model.SendResult {
-       var request *remoting.RemotingCommand
-       request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader)
-       request.SetBody(msg.Body)
-       return api.sendMessageSync(addr, brokerName, msg, timeout, request)
-}
+//func NewMQClientAPI(cfg *config.ClientConfig, processor 
*ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI {
+//     api := &MQClientAPI{
+//             remotingClient: &remoting.RemotingClient{}, //TODO
+//             topAddress:     &TopAddress{},              // TODO
+//             crp:            processor,
+//             config:         cfg,
+//     }
+//
+//     // TODO register
+//     return api
+//}
+//
+//func (api *MQClientAPI) SendMessage(addr, brokerName string,
+//     msg message.Message, requestHeader header.SendMessageRequestHeader, 
timeout int64) *model.SendResult {
+//     var request *remoting.RemotingCommand
+//     request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader)
+//     request.SetBody(msg.Body)
+//     return api.sendMessageSync(addr, brokerName, msg, timeout, request)
+//}
 
 func (api *MQClientAPI) sendMessageSync(addr, brokerName string,
        msg message.Message,

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/consume_message_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_message_service.go 
b/rocketmq-go/service/consume_message_service.go
new file mode 100644
index 0000000..09be61c
--- /dev/null
+++ b/rocketmq-go/service/consume_message_service.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service
+
+import (
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+       "github.com/golang/glog"
+       "time"
+)
+
+type ConsumeMessageService interface {
+       //ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt 
msg, final String brokerName);
+
+       Init(consumerGroup string, mqClient RocketMqClient, offsetStore 
OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig 
*config.RocketMqConsumerConfig)
+       SubmitConsumeRequest(msgs []model.MessageExt, processQueue 
*model.ProcessQueue,
+               messageQueue *model.MessageQueue, dispathToConsume bool)
+       SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, 
brokerName string) (err error)
+       ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) 
(consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error)
+}
+
+type ConsumeMessageConcurrentlyServiceImpl struct {
+       consumerGroup   string
+       messageListener model.MessageListener
+       //sendMessageBackProducerService SendMessageBackProducerService //for 
send retry Message
+       offsetStore    OffsetStore
+       consumerConfig *config.RocketMqConsumerConfig
+}
+
+func NewConsumeMessageConcurrentlyServiceImpl(messageListener 
model.MessageListener) (consumeService ConsumeMessageService) {
+       //consumeService = 
&ConsumeMessageConcurrentlyServiceImpl{messageListener:messageListener, 
sendMessageBackProducerService:&SendMessageBackProducerServiceImpl{}}
+       return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) Init(consumerGroup string, 
mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService 
*DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
+       self.consumerGroup = consumerGroup
+       self.offsetStore = offsetStore
+       
//self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup,
 mqClient,defaultProducerService,consumerConfig)
+       self.consumerConfig = consumerConfig
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs 
[]model.MessageExt, processQueue *model.ProcessQueue, messageQueue 
*model.MessageQueue, dispathToConsume bool) {
+       msgsLen := len(msgs)
+       for i := 0; i < msgsLen; {
+               begin := i
+               end := i + self.consumerConfig.ConsumeMessageBatchMaxSize
+               if end > msgsLen {
+                       end = msgsLen
+               }
+               go func() {
+                       glog.V(2).Infof("look slice begin %d end %d msgsLen 
%d", begin, end, msgsLen)
+                       batchMsgs := 
transformMessageToConsume(self.consumerGroup, msgs[begin:end])
+                       consumeState := self.messageListener(batchMsgs)
+                       self.processConsumeResult(consumeState, batchMsgs, 
messageQueue, processQueue)
+               }()
+               i = end
+       }
+       return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) SendMessageBack(messageExt 
*model.MessageExt, delayLayLevel int, brokerName string) (err error) {
+       //err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 
0, brokerName)
+       return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) 
ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) 
(consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error) {
+       start := time.Now().UnixNano() / 1000000
+       consumeResult := self.messageListener([]model.MessageExt{*messageExt})
+       consumeMessageDirectlyResult.AutoCommit = true
+       consumeMessageDirectlyResult.Order = false
+       consumeMessageDirectlyResult.SpentTimeMills = 
time.Now().UnixNano()/1000000 - start
+       if consumeResult.ConsumeConcurrentlyStatus == "CONSUME_SUCCESS" && 
consumeResult.AckIndex >= 0 {
+               consumeMessageDirectlyResult.ConsumeResult = "CR_SUCCESS"
+
+       } else {
+               consumeMessageDirectlyResult.ConsumeResult = 
"CR_THROW_EXCEPTION"
+       }
+       return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result 
model.ConsumeConcurrentlyResult, msgs []model.MessageExt, messageQueue 
*model.MessageQueue, processQueue *model.ProcessQueue) {
+       if processQueue.IsDropped() {
+               glog.Warning("processQueue is dropped without process consume 
result. ", msgs)
+               return
+       }
+       if len(msgs) == 0 {
+               return
+       }
+       ackIndex := result.AckIndex
+       if model.CONSUME_SUCCESS == result.ConsumeConcurrentlyStatus {
+               if ackIndex >= len(msgs) {
+                       ackIndex = len(msgs) - 1
+               } else {
+                       if result.AckIndex < 0 {
+                               ackIndex = -1
+                       }
+               }
+       }
+       var failedMessages []model.MessageExt
+       successMessages := []model.MessageExt{}
+       if ackIndex >= 0 {
+               successMessages = msgs[:ackIndex+1]
+       }
+       for i := ackIndex + 1; i < len(msgs); i++ {
+               err := self.SendMessageBack(&msgs[i], 0, 
messageQueue.BrokerName)
+               if err != nil {
+                       msgs[i].ReconsumeTimes = msgs[i].ReconsumeTimes + 1
+                       failedMessages = append(failedMessages, msgs[i])
+               } else {
+                       successMessages = append(successMessages, msgs[i])
+               }
+       }
+       if len(failedMessages) > 0 {
+               self.SubmitConsumeRequest(failedMessages, processQueue, 
messageQueue, true)
+       }
+       //commitOffset := processQueue.RemoveMessage(successMessages)
+       //if (commitOffset > 0 && ! processQueue.IsDropped()) {
+       //      self.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
+       //}
+
+}
+
+func transformMessageToConsume(consumerGroup string, msgs []model.MessageExt) 
[]model.MessageExt {
+       retryTopicName := constant.RETRY_GROUP_TOPIC_PREFIX + consumerGroup
+
+       for _, msg := range msgs {
+               //reset retry topic name
+               if msg.Message.Topic == retryTopicName {
+                       retryTopic := 
msg.Properties[constant.PROPERTY_RETRY_TOPIC]
+                       if len(retryTopic) > 0 {
+                               msg.Message.Topic = retryTopic
+                       }
+               }
+               //set consume start time
+               msg.SetConsumeStartTime()
+       }
+       return msgs
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/consume_messsage_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_messsage_service.go 
b/rocketmq-go/service/consume_messsage_service.go
deleted file mode 100644
index d3b28fc..0000000
--- a/rocketmq-go/service/consume_messsage_service.go
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package service
-
-type ConsumeMessageService struct {
-}

Reply via email to