http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/response_code.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/response_code.go b/rocketmq-go/model/response_code.go index 957c1e9..ed40a6d 100644 --- a/rocketmq-go/model/response_code.go +++ b/rocketmq-go/model/response_code.go @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package model const (
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/send_result.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/send_result.go b/rocketmq-go/model/send_result.go index b1af4fb..4d3b31f 100644 --- a/rocketmq-go/model/send_result.go +++ b/rocketmq-go/model/send_result.go @@ -52,7 +52,7 @@ func NewSendResult(status SendStatus, msgID, offsetID string, queue *message.Mes } func EncoderSendResultToJson(obj interface{}) string { - return nil // TODO + return "" // TODO } func DecoderSendResultFromJson(json string) *SendResult { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/subscription_data.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/subscription_data.go b/rocketmq-go/model/subscription_data.go new file mode 100644 index 0000000..ce5ae74 --- /dev/null +++ b/rocketmq-go/model/subscription_data.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +type SubscriptionData struct { + Topic string + SubString string + ClassFilterMode bool + TagsSet []string + CodeSet []string + SubVersion int64 +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_publishInfo.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/topic_publishInfo.go b/rocketmq-go/model/topic_publishInfo.go index b2f711b..b5f9e37 100644 --- a/rocketmq-go/model/topic_publishInfo.go +++ b/rocketmq-go/model/topic_publishInfo.go @@ -18,59 +18,59 @@ package model import ( - "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" +//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" ) -type TopicPublishInfo struct { - orderTopic bool - havaTopicRouteInfo bool - messageQueueList []*message.MessageQueue - topicRouteData *TopicRouteData -} - -func (info *TopicPublishInfo) SetOrderTopic(b bool) { - info.orderTopic = b -} - -func (info *TopicPublishInfo) Ok() bool { - return false -} - -func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue { - return info.messageQueueList -} - -func (info *TopicPublishInfo) HaveTopicRouteInfo() bool { - return info.havaTopicRouteInfo -} - -func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) { - info.havaTopicRouteInfo = b -} - -func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData { - return info.topicRouteData -} - -func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) { - info.topicRouteData = routeDate -} - -func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue { - return nil //TODO -} - -func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue { - if brokerName == "" { - return info.SelectOneMessageQueue() - } - return nil //TODO -} - -func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int { - return nil //TODO -} - -func (info *TopicPublishInfo) String() string { - return nil -} +//type TopicPublishInfo struct { +// orderTopic bool +// havaTopicRouteInfo bool +// messageQueueList []*message.MessageQueue +// topicRouteData *TopicRouteData +//} +// +//func (info *TopicPublishInfo) SetOrderTopic(b bool) { +// info.orderTopic = b +//} +// +//func (info *TopicPublishInfo) Ok() bool { +// return false +//} +// +//func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue { +// return info.messageQueueList +//} +// +//func (info *TopicPublishInfo) HaveTopicRouteInfo() bool { +// return info.havaTopicRouteInfo +//} +// +//func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) { +// info.havaTopicRouteInfo = b +//} +// +//func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData { +// return info.topicRouteData +//} +// +//func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) { +// info.topicRouteData = routeDate +//} +// +//func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue { +// return nil //TODO +//} +// +//func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue { +// if brokerName == "" { +// return info.SelectOneMessageQueue() +// } +// return nil //TODO +//} +// +//func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int { +// return 0 //TODO +//} +// +//func (info *TopicPublishInfo) String() string { +// return "" +//} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_publish_info.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/topic_publish_info.go b/rocketmq-go/model/topic_publish_info.go new file mode 100644 index 0000000..14ec088 --- /dev/null +++ b/rocketmq-go/model/topic_publish_info.go @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "sync/atomic" +) + +type TopicPublishInfo struct { + OrderTopic bool + HaveTopicRouterInfo bool + MessageQueueList []MessageQueue + TopicRouteDataInstance *TopicRouteData + topicQueueIndex int32 +} + +//private boolean orderTopic = false; +//private boolean haveTopicRouterInfo = false; +//private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); +//private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); // todo +//private TopicRouteData topicRouteData; + +func (self *TopicPublishInfo) JudgeTopicPublishInfoOk() (bIsTopicOk bool) { + bIsTopicOk = (len(self.MessageQueueList) > 0) + return +} +func (self *TopicPublishInfo) FetchQueueIndex() (index int) { + qLen := len(self.MessageQueueList) + if qLen > 0 { + qIndex := atomic.AddInt32(&self.topicQueueIndex, 1) + qIndex = qIndex % int32(qLen) + index = int(qIndex) + } + return +} +func BuildTopicSubscribeInfoFromRoteData(topic string, topicRouteData *TopicRouteData) (mqList []*MessageQueue) { + mqList = make([]*MessageQueue, 0) + for _, queueData := range topicRouteData.QueueDatas { + if !constant.ReadAble(queueData.Perm) { + continue + } + var i int32 + for i = 0; i < queueData.ReadQueueNums; i++ { + mq := &MessageQueue{ + Topic: topic, + BrokerName: queueData.BrokerName, + QueueId: i, + } + mqList = append(mqList, mq) + } + } + return +} + +func BuildTopicPublishInfoFromTopicRoteData(topic string, topicRouteData *TopicRouteData) (topicPublishInfo *TopicPublishInfo) { + // all order topic is false todo change + topicPublishInfo = &TopicPublishInfo{ + TopicRouteDataInstance: topicRouteData, + OrderTopic: false, + MessageQueueList: []MessageQueue{}} + for _, queueData := range topicRouteData.QueueDatas { + if !constant.WriteAble(queueData.Perm) { + continue + } + for _, brokerData := range topicRouteData.BrokerDatas { + if brokerData.BrokerName == queueData.BrokerName { + if len(brokerData.BrokerAddrs["0"]) == 0 { + break + } + var i int32 + for i = 0; i < queueData.WriteQueueNums; i++ { + messageQueue := MessageQueue{Topic: topic, BrokerName: queueData.BrokerName, QueueId: i} + topicPublishInfo.MessageQueueList = append(topicPublishInfo.MessageQueueList, messageQueue) + topicPublishInfo.HaveTopicRouterInfo = true + } + break + } + } + } + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_route_data.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/topic_route_data.go b/rocketmq-go/model/topic_route_data.go index f387529..348479f 100644 --- a/rocketmq-go/model/topic_route_data.go +++ b/rocketmq-go/model/topic_route_data.go @@ -18,108 +18,128 @@ package model import ( - "fmt" - "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" + //"fmt" + //"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" + "sync" ) -type BrokerData struct { -} +// +//type BrokerData struct { +//} +// +//type TopicRouteData struct { +// orderTopicConf string +// queueDatas []*message.MessageQueue +// brokerDatas []*BrokerData +// filterServerTable map[string][]string +//} +// +//func NewTopicRouteData() *TopicRouteData { +// return &TopicRouteData{} +//} +// +//func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) { +// clonedRouteData = &TopicRouteData{ +// route.orderTopicConf, +// route.queueDatas, +// route.brokerDatas, +// route.filterServerTable, +// } +// // TODO: to complete +// return +//} +// +//func (route *TopicRouteData) QueueDatas() []*message.MessageQueue { +// return route.queueDatas +//} +// +//func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) { +// route.queueDatas = data +//} +// +//func (route *TopicRouteData) BrokerDatas() []*BrokerData { +// return route.brokerDatas +//} +// +//func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) { +// route.brokerDatas = data +//} +// +//func (route *TopicRouteData) FilterServerTable() map[string][]string { +// return route.filterServerTable +//} +// +//func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) { +// route.filterServerTable = data +//} +// +//func (route *TopicRouteData) OrderTopicConf() string { +// return route.orderTopicConf +//} +// +//func (route *TopicRouteData) SetOrderTopicConf(s string) { +// route.orderTopicConf = s +//} +// +//func (route *TopicRouteData) HashCode() (result int) { +// prime := 31 +// result = 1 +// result *= prime +// // TODO +// +// return +//} +// +//func (route *TopicRouteData) Equals(route1 interface{}) bool { +// if route == nil { +// return true +// } +// if route1 == nil { +// return false +// } +// //value, ok := route1.(TopicRouteData) +// //if !ok { +// // return false +// //} +// // TODO +// //if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) { +// // return false +// //} +// // +// //if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf { +// // return false +// //} +// // +// //if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas { +// // return false +// //} +// // +// //if route.filterServerTable == nil && value.filterServerTable != nil || +// // route.filterServerTable != value.filterServerTable { +// // return false +// //} +// return true +//} +// +//func (route *TopicRouteData) String() string { +// return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]", +// route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable) +//} type TopicRouteData struct { - orderTopicConf string - queueDatas []*message.MessageQueue - brokerDatas []*BrokerData - filterServerTable map[string][]string -} - -func NewTopicRouteData() *TopicRouteData { - return &TopicRouteData{} -} - -func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) { - clonedRouteData = &TopicRouteData{ - route.orderTopicConf, - route.queueDatas, - route.brokerDatas, - route.filterServerTable, - } - // TODO: to complete - return -} - -func (route *TopicRouteData) QueueDatas() []*message.MessageQueue { - return route.queueDatas -} - -func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) { - route.queueDatas = data -} - -func (route *TopicRouteData) BrokerDatas() []*BrokerData { - return route.brokerDatas -} - -func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) { - route.brokerDatas = data + OrderTopicConf string + QueueDatas []*QueueData + BrokerDatas []*BrokerData } - -func (route *TopicRouteData) FilterServerTable() map[string][]string { - return route.filterServerTable -} - -func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) { - route.filterServerTable = data -} - -func (route *TopicRouteData) OrderTopicConf() string { - return route.orderTopicConf -} - -func (route *TopicRouteData) SetOrderTopicConf(s string) { - route.orderTopicConf = s +type QueueData struct { + BrokerName string + ReadQueueNums int32 + WriteQueueNums int32 + Perm int32 + TopicSynFlag int32 } - -func (route *TopicRouteData) HashCode() (result int) { - prime := 31 - result = 1 - result *= prime - // TODO - - return -} - -func (route *TopicRouteData) Equals(route1 interface{}) bool { - if route == nil { - return true - } - if route1 == nil { - return false - } - //value, ok := route1.(TopicRouteData) - //if !ok { - // return false - //} - // TODO - //if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) { - // return false - //} - // - //if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf { - // return false - //} - // - //if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas { - // return false - //} - // - //if route.filterServerTable == nil && value.filterServerTable != nil || - // route.filterServerTable != value.filterServerTable { - // return false - //} - return true -} - -func (route *TopicRouteData) String() string { - return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]", - route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable) +type BrokerData struct { + BrokerName string + BrokerAddrs map[string]string + BrokerAddrsLock sync.RWMutex } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_client_manager.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/mq_client_manager.go b/rocketmq-go/mq_client_manager.go index c07dcfd..731158f 100644 --- a/rocketmq-go/mq_client_manager.go +++ b/rocketmq-go/mq_client_manager.go @@ -16,13 +16,67 @@ */ package rocketmq -import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" + "sync" + "time" +) type MqClientManager struct { clientFactory *ClientFactory rocketMqClient service.RocketMqClient pullMessageController *PullMessageController defaultProducerService RocketMQProducer //for send back message + + rocketMqManagerLock sync.Mutex + //ClientId string + BootTimestamp int64 + + NamesrvLock sync.Mutex + HeartBeatLock sync.Mutex + //rebalanceControllr *RebalanceController +} + +type MqClientConfig struct { +} + +func NewMqClientManager(clientConfig *MqClientConfig) (rocketMqManager *MqClientManager) { + rocketMqManager = &MqClientManager{} + rocketMqManager.BootTimestamp = time.Now().Unix() + rocketMqManager.clientFactory = clientFactoryInit() + //rocketMqManager.rocketMqClient = + //rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory) + //rocketMqManager.cleanExpireMsgController = NewCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory) + //rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory) + + return +} + +func (self *MqClientManager) RegisterProducer(producer *DefaultMQProducer) { + return +} + +func (self *MqClientManager) RegisterConsumer(consumer RocketMQConsumer) { + // todo check config + //if (self.defaultProducerService == nil) { + // self.defaultProducerService = service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, mq_config.NewProducerConfig(), self.mqClient) + //} + return +} + +func (self *MqClientManager) Start() { + //self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat first + self.startAllScheduledTask() +} +func (manager *MqClientManager) startAllScheduledTask() { + +} + +func clientFactoryInit() (clientFactory *ClientFactory) { + clientFactory = &ClientFactory{} + clientFactory.ProducerTable = make(map[string]RocketMQProducer) + clientFactory.ConsumerTable = make(map[string]RocketMQConsumer) + return } type ClientFactory struct { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_consumer.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/mq_consumer.go b/rocketmq-go/mq_consumer.go index 7712ee1..7112537 100644 --- a/rocketmq-go/mq_consumer.go +++ b/rocketmq-go/mq_consumer.go @@ -16,7 +16,10 @@ */ package rocketmq -import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" +) type RocketMQConsumer interface { } @@ -28,5 +31,44 @@ type DefaultMQPushConsumer struct { mqClient service.RocketMqClient rebalance *service.Rebalance //Rebalance's impl depend on offsetStore consumeMessageService service.ConsumeMessageService - ConsumerConfig *MqConsumerConfig + consumerConfig *MqConsumerConfig + + consumerGroup string + //consumeFromWhere string + consumeType string + messageModel string + unitMode bool + + subscription map[string]string //topic|subExpression + subscriptionTag map[string][]string // we use it filter again + // åé çç¥ + pause bool //when reset offset we need pause +} + +func NewDefaultMQPushConsumer(consumerGroup string, mqConsumerConfig *MqConsumerConfig) (defaultMQPushConsumer *DefaultMQPushConsumer) { + defaultMQPushConsumer = &DefaultMQPushConsumer{} + defaultMQPushConsumer.consumerConfig = mqConsumerConfig + return +} + +func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) { + self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener) +} +func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) { + //self.subscription[topic] = subExpression + //if len(subExpression) == 0 || subExpression == "*" { + // return + //} + //tags := strings.Split(subExpression, "||") + //tagsList := []string{} + //for _, tag := range tags { + // t := strings.TrimSpace(tag) + // if len(t) == 0 { + // continue + // } + // tagsList = append(tagsList, t) + //} + //if len(tagsList) > 0 { + // self.subscriptionTag[topic] = tagsList + //} } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_producer.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/mq_producer.go b/rocketmq-go/mq_producer.go index 3677939..d1a011b 100644 --- a/rocketmq-go/mq_producer.go +++ b/rocketmq-go/mq_producer.go @@ -25,7 +25,7 @@ type MqProducerConfig struct { } type DefaultMQProducer struct { - producerGroup string - ProducerConfig *MqProducerConfig - producerService service.ProducerService + producerGroup string + mqProducerConfig *MqProducerConfig + producerService service.ProducerService } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/custom_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/custom_header.go b/rocketmq-go/remoting/custom_header.go index 40feade..04a46be 100644 --- a/rocketmq-go/remoting/custom_header.go +++ b/rocketmq-go/remoting/custom_header.go @@ -18,4 +18,5 @@ package remoting type CustomerHeader interface { FromMap(headerMap map[string]interface{}) + //ToMap()(headerMap map[string]interface{}) } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/event_executor.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/event_executor.go b/rocketmq-go/remoting/event_executor.go deleted file mode 100644 index 38e1ee6..0000000 --- a/rocketmq-go/remoting/event_executor.go +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package remoting - -import ( - "fmt" - "github.com/golang/glog" - "net" - "sync" -) - -type Runnable interface { - Run() -} - -type NetEventType int - -const ( - Connect NetEventType = iota - Close - Idle - Exception // TODO error? -) - -type NetEvent struct { - eType NetEventType - remoteAddress string - conn net.Conn -} - -func NewEventType(eType NetEventType, remoteAddr string, conn net.Conn) *NetEvent { - return &NetEvent{eType, remoteAddr, conn} -} - -func (event *NetEvent) Type() NetEventType { - return event.eType -} - -func (event *NetEvent) RemoteAddress() string { - return event.remoteAddress -} - -func (event *NetEvent) Conn() net.Conn { - return event.conn -} - -func (event *NetEvent) String() string { - return fmt.Sprintf("NettyEvent [type=%s, remoteAddr=%s, channel=%s]", - event.eType, event.remoteAddress, event.conn) -} - -type NetEventExecutor struct { - hasNotified bool - running bool - stopped chan int - mu sync.RWMutex // TODO need init? - client *RemotingClient - - eventQueue chan *NetEvent - maxSize int -} - -func NewNetEventExecutor(client *RemotingClient) *NetEventExecutor { - return &NetEventExecutor{ - hasNotified: false, - running: false, - stopped: make(chan int), - client: client, - eventQueue: make(chan *NetEvent, 100), // TODO confirm size - maxSize: 10000, - } -} - -func (executor *NetEventExecutor) Start() { - go executor.run() -} - -func (executor *NetEventExecutor) Shutdown() { - executor.stopped <- 0 -} - -func (executor *NetEventExecutor) PutEvent(event *NetEvent) { - if len(executor.eventQueue) <= executor.maxSize { - executor.eventQueue <- event //append(executor.eventQueue, event) - } else { - fmt.Sprintf("event queue size[%s] enough, so drop this event %s", len(executor.eventQueue), event.String()) - } -} - -func (executor *NetEventExecutor) ServiceName() string { - // TODO - return nil -} - -func (executor *NetEventExecutor) run() { - glog.Infof("%s service started", executor.ServiceName()) - - executor.mu.Lock() - executor.running = true - executor.mu.Unlock() - - listener := executor.client.ConnEventListener() - for executor.running { // TODO optimize - select { - case event := <-executor.eventQueue: - if event != nil && listener != nil { - switch event.Type() { - case Connect: - listener.OnConnConnect(event.remoteAddress, event.Conn()) - case Close: - listener.OnConnClose(event.remoteAddress, event.Conn()) - case Idle: - listener.OnConnIdle(event.remoteAddress, event.Conn()) - case Exception: - listener.OnConnException(event.remoteAddress, event.Conn()) - default: - break - } - } - case <-executor.stopped: - executor.mu.Lock() - executor.running = false - executor.mu.Unlock() - break - } - } - - glog.Infof("%s service exit.", executor.ServiceName()) -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/json_serializable.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/json_serializable.go b/rocketmq-go/remoting/json_serializable.go new file mode 100644 index 0000000..c2c5ea0 --- /dev/null +++ b/rocketmq-go/remoting/json_serializable.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +import ( + "encoding/json" +) + +type JsonSerializer struct { +} + +func (self *JsonSerializer) EncodeHeaderData(command *RemotingCommand) []byte { + buf, err := json.Marshal(command) + if err != nil { + return nil + } + return buf +} +func (self *JsonSerializer) DecodeRemoteCommand(header, body []byte) *RemotingCommand { + cmd := &RemotingCommand{} + cmd.ExtFields = make(map[string]interface{}) + err := json.Unmarshal(header, cmd) + if err != nil { + return nil + } + cmd.Body = body + return cmd +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/remoting_client.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/remoting_client.go b/rocketmq-go/remoting/remoting_client.go index 38685cb..206fdcf 100644 --- a/rocketmq-go/remoting/remoting_client.go +++ b/rocketmq-go/remoting/remoting_client.go @@ -14,305 +14,357 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package remoting import ( + "bytes" + "encoding/binary" "errors" - "fmt" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" "github.com/golang/glog" "math/rand" "net" + "strconv" + "strings" "sync" "time" ) -type ConnEventListener interface { - OnConnConnect(remoteAddress string, conn net.Conn) - OnConnClose(remoteAddress string, conn net.Conn) - OnConnIdle(remoteAddress string, conn net.Conn) - OnConnException(remoteAddress string, conn net.Conn) +type RemotingClient interface { + InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error) + InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error + InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error } - -type NetRequestProcessor struct { +type DefalutRemotingClient struct { + clientId string + clientConfig *config.ClientConfig + + connTable map[string]net.Conn + connTableLock sync.RWMutex + + responseTable util.ConcurrentMap //map[int32]*ResponseFuture + processorTable util.ConcurrentMap //map[int]ClientRequestProcessor //requestCode|ClientRequestProcessor + // protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = + //new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); + namesrvAddrList []string + namesrvAddrSelectedAddr string + namesrvAddrSelectedIndex int //how to chose. done + namesvrLockRW sync.RWMutex // + clientRequestProcessor ClientRequestProcessor //mange register the processor here + serializerHandler SerializerHandler //rocketmq encode decode } -type NetConfig struct { - clientWorkerNumber int - clientCallbackExecutorNumber int - clientOneWaySemaphoreValue int - clientAsyncSemaphoreValue int - connectTimeoutMillis time.Duration - channelNotActiveInterval time.Duration - - clientChannelMaxIdleTimeSeconds time.Duration - clientSocketSndBufSize int - clientSocketRcvBufSize int - clientPooledByteBufAllocatorEnable bool - clientCloseSocketIfTimeout bool +func RemotingClientInit(clientConfig *config.ClientConfig, clientRequestProcessor ClientRequestProcessor) (client *DefalutRemotingClient) { + client = &DefalutRemotingClient{} + client.connTable = map[string]net.Conn{} + client.responseTable = util.New() + client.clientConfig = clientConfig + + client.namesrvAddrList = strings.Split(clientConfig.NameServerAddress(), ";") + client.namesrvAddrSelectedIndex = -1 + client.clientRequestProcessor = clientRequestProcessor + client.serializerHandler = NewSerializerHandler() + return } -type Pair struct { - o1 *NetRequestProcessor - o2 *ExecutorService +func (self *DefalutRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error) { + var conn net.Conn + conn, err = self.GetOrCreateConn(addr) + response := &ResponseFuture{ + SendRequestOK: false, + Opaque: request.Opaque, + TimeoutMillis: timeoutMillis, + BeginTimestamp: time.Now().Unix(), + Done: make(chan bool), + } + header := self.serializerHandler.EncodeHeader(request) + body := request.Body + self.SetResponse(request.Opaque, response) + err = self.sendRequest(header, body, conn, addr) + if err != nil { + glog.Error(err) + return + } + select { + case <-response.Done: + remotingCommand = response.ResponseCommand + return + case <-time.After(time.Duration(timeoutMillis) * time.Millisecond): + err = errors.New("invoke sync timeout:" + strconv.FormatInt(timeoutMillis, 10) + " Millisecond") + return + } } - -type RemotingClient struct { - semaphoreOneWay sync.Mutex // TODO right? use chan? - semaphoreAsync sync.Mutex - processorTable map[int]*Pair - netEventExecutor *NetEventExecutor - defaultRequestProcessor *Pair - - config NetConfig - connTable map[string]net.Conn - connTableLock sync.RWMutex - timer *time.Timer - namesrvAddrList []string - namesrvAddrChoosed string - - callBackExecutor *ExecutorService - listener ConnEventListener - rpcHook RPCHook - responseTable map[int32]*ResponseFuture - responseTableLock sync.RWMutex +func (self *DefalutRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error { + conn, err := self.GetOrCreateConn(addr) + if err != nil { + return err + } + response := &ResponseFuture{ + SendRequestOK: false, + Opaque: request.Opaque, + TimeoutMillis: timeoutMillis, + BeginTimestamp: time.Now().Unix(), + InvokeCallback: invokeCallback, + } + self.SetResponse(request.Opaque, response) + header := self.serializerHandler.EncodeHeader(request) + body := request.Body + err = self.sendRequest(header, body, conn, addr) + if err != nil { + glog.Error(err) + return err + } + return err } - -type ConnHandlerContext struct { +func (self *DefalutRemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error { + conn, err := self.GetOrCreateConn(addr) + if err != nil { + return err + } + header := self.serializerHandler.EncodeHeader(request) + body := request.Body + err = self.sendRequest(header, body, conn, addr) + if err != nil { + glog.Error(err) + return err + } + return err } -type ExecutorService struct { - callBackChannel chan func() - quit chan bool +func (self *DefalutRemotingClient) sendRequest(header, body []byte, conn net.Conn, addr string) error { + var requestBytes []byte + requestBytes = append(requestBytes, header...) + if body != nil && len(body) > 0 { + requestBytes = append(requestBytes, body...) + } + _, err := conn.Write(requestBytes) + if err != nil { + glog.Error(err) + if len(addr) > 0 { + self.ReleaseConn(addr, conn) + } + return err + } + return nil } - -func (exec *ExecutorService) submit(callback func()) { - exec.callBackChannel <- callback +func (self *DefalutRemotingClient) GetNamesrvAddrList() []string { + return self.namesrvAddrList } -func (exec *ExecutorService) run() { - go func() { - glog.Info("Callback Executor routing start.") - for { - select { - case invoke := <-exec.callBackChannel: - invoke() - case <-exec.quit: - break - } - } - glog.Info("Callback Executor routing quit.") - }() +func (self *DefalutRemotingClient) SetResponse(index int32, response *ResponseFuture) { + self.responseTable.Set(strconv.Itoa(int(index)), response) } - -func NewRemotingClient(cfg NetConfig) *RemotingClient { - client := &RemotingClient{ - config: cfg, - connTable: make(map[string]net.Conn), - timer: time.NewTimer(10 * time.Second), - responseTable: make(map[int32]*ResponseFuture), +func (self *DefalutRemotingClient) getResponse(index int32) (response *ResponseFuture, err error) { + obj, ok := self.responseTable.Get(strconv.Itoa(int(index))) + if !ok { + err = errors.New("get conn from responseTable error") + return } - // java: super(xxxx) - return client + response = obj.(*ResponseFuture) + return } - -func (rc *RemotingClient) PutNetEvent(event *NetEvent) { - rc.netEventExecutor.PutEvent(event) +func (self *DefalutRemotingClient) removeResponse(index int32) { + self.responseTable.Remove(strconv.Itoa(int(index))) } - -func (rc *RemotingClient) executeInvokeCallback(future *ResponseFuture) { - executor := rc.CallbackExecutor() - if executor != nil { - executor.submit(func() { - future.invokeCallback(future) - }) +func (self *DefalutRemotingClient) GetOrCreateConn(address string) (conn net.Conn, err error) { + if len(address) == 0 { + conn, err = self.getNamesvrConn() return } - future.executeInvokeCallback() -} - -// check timeout future -func (rc *RemotingClient) scanResponseTable() { - rfMap := make(map[int]*ResponseFuture) - for k, future := range rc.responseTable { // TODO safety? - if int64(future.beginTimestamp)+int64(future.timeoutMillis)+1e9 <= time.Now().Unix() { - future.Done() - delete(rc.responseTable, k) - rfMap[int(k)] = future - glog.Warningf("remove timeout request, ", future.String()) - } + conn = self.GetConn(address) + if conn != nil { + return } - - go func() { - for _, future := range rfMap { - rc.executeInvokeCallback(future) // TODO if still failed, how to deal with the message ? - } - }() + conn, err = self.CreateConn(address) + return } - -func (rc *RemotingClient) CallbackExecutor() *ExecutorService { - return rc.callBackExecutor -} - -func (rc *RemotingClient) RPCHook() RPCHook { - return rc.rpcHook +func (self *DefalutRemotingClient) GetConn(address string) (conn net.Conn) { + self.connTableLock.RLock() + conn = self.connTable[address] + self.connTableLock.RUnlock() + return } - -func (rc *RemotingClient) ConnEventListener() ConnEventListener { - return rc.listener -} - -func (rc *RemotingClient) invokeSync(addr string, request *RemotingCommand, - timeout time.Duration) (*RemotingCommand, error) { - conn := rc.getAndCreateConn(addr) +func (self *DefalutRemotingClient) CreateConn(address string) (conn net.Conn, err error) { + defer self.connTableLock.Unlock() + self.connTableLock.Lock() + conn = self.connTable[address] if conn != nil { - if rc.rpcHook != nil { - rc.rpcHook.DoBeforeRequest(addr, request) - } - opaque := request.opaque - //defer delete(rc.responseTable, opaque) TODO should in listener - - future := &ResponseFuture{ - opaque: opaque, - timeoutMillis: timeout, - } - rc.responseTable[opaque] = future - - conn.Write(request.encode()) // TODO register listener - - response := future.WaitResponse(timeout) - if response == nil { - if future.sendRequestOK { - return nil, errors.New(fmt.Sprintf("RemotingTimeout error: %s", future.err.Error())) - } else { - return nil, errors.New(fmt.Sprintf("RemotingSend error: %s", future.err.Error())) - } - } - - if rc.rpcHook != nil { - rc.rpcHook.DoBeforeResponse(addr, response) - } - return response, nil - } else { - rc.CloseConn(addr) // TODO - return nil, errors.New(fmt.Sprintf("Connection to %s ERROR!", addr)) + return } + conn, err = self.createAndHandleTcpConn(address) + self.connTable[address] = conn + return } -func (rc *RemotingClient) invokeAsync(addr string, request *RemotingCommand, - timeout time.Duration, callback InvokeCallback) error { - conn := rc.getAndCreateConn(addr) - if conn != nil { // TODO how to confirm conn active? - if rc.rpcHook != nil { - rc.rpcHook.DoBeforeRequest(addr, request) +func (self *DefalutRemotingClient) getNamesvrConn() (conn net.Conn, err error) { + self.namesvrLockRW.RLock() + address := self.namesrvAddrSelectedAddr + self.namesvrLockRW.RUnlock() + if len(address) != 0 { + conn = self.GetConn(address) + if conn != nil { + return } + } - opaque := request.opaque - acquired := false // TODO semaphore.tryAcquire... - if acquired { - //final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); - - future := NewResponseFuture(opaque, timeout, callback) - rc.responseTable[opaque] = future - - future.WaitResponse(timeout) // TODO add listener + defer self.namesvrLockRW.Unlock() + self.namesvrLockRW.Lock() + //already connected by another write lock owner + address = self.namesrvAddrSelectedAddr + if len(address) != 0 { + conn = self.GetConn(address) + if conn != nil { + return } - return nil - } else { - rc.CloseConn(addr) // TODO - return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr)) } -} -func (rc *RemotingClient) invokeOneWay(addr string, request *RemotingCommand, - timeout time.Duration) error { - conn := rc.getAndCreateConn(addr) - if conn != nil { - if rc.rpcHook != nil { - rc.rpcHook.DoBeforeRequest(addr, request) + addressCount := len(self.namesrvAddrList) + if self.namesrvAddrSelectedIndex < 0 { + self.namesrvAddrSelectedIndex = rand.Intn(addressCount) + } + for i := 1; i <= addressCount; i++ { + selectedIndex := (self.namesrvAddrSelectedIndex + i) % addressCount + selectAddress := self.namesrvAddrList[selectedIndex] + if len(selectAddress) == 0 { + continue + } + conn, err = self.CreateConn(selectAddress) + if err == nil { + self.namesrvAddrSelectedAddr = selectAddress + self.namesrvAddrSelectedIndex = selectedIndex + return } - - request.MarkOneWayRpc() - // TODO boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); - return nil - } else { - rc.CloseConn(addr) // TODO - return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr)) } + err = errors.New("all namesvrAddress can't use!,address:" + self.clientConfig.NameServerAddress()) + return } - -func initValueIndex() int { - r := rand.Int() - if r < 0 { // math.Abs para is float64 - r = -r +func (self *DefalutRemotingClient) createAndHandleTcpConn(address string) (conn net.Conn, err error) { + conn, err = net.Dial("tcp", address) + if err != nil { + glog.Error(err) + return nil, err } - return r % 999 % 999 + go self.handlerReceiveLoop(conn, address) //handlerè¿æ¥ å¤çè¿ä¸ªè¿æ¥è¿åçç»æ + return } - -func (rc *RemotingClient) Start() { - // TODO -} - -func (rc *RemotingClient) Shutdown() { - // TODO - rc.timer.Stop() -} - -func (rc *RemotingClient) registerRPCHook(hk RPCHook) { - rc.rpcHook = hk -} - -func (rc *RemotingClient) CloseConn(addr string) { - // TODO +func (self *DefalutRemotingClient) ReleaseConn(addr string, conn net.Conn) { + defer self.connTableLock.Unlock() + conn.Close() + self.connTableLock.Lock() + delete(self.connTable, addr) } -func (rc *RemotingClient) updateNameServerAddressList(addrs []string) { - old, update := rc.namesrvAddrList, false - - if addrs != nil && len(addrs) > 0 { - if old == nil || len(addrs) != len(old) { - update = true - } else { - for i := 0; i < len(addrs) && !update; i++ { - if contains(old, addrs[i]) { - update = true +func (self *DefalutRemotingClient) handlerReceiveLoop(conn net.Conn, addr string) (err error) { + defer func() { + //when for is break releaseConn + glog.Error(err, addr) + self.ReleaseConn(addr, conn) + }() + b := make([]byte, 1024) + var length, headerLength, bodyLength int32 + var buf = bytes.NewBuffer([]byte{}) + var header, body []byte + var readTotalLengthFlag = true //readLen when true,read data when false + for { + var n int + n, err = conn.Read(b) + if err != nil { + return + } + _, err = buf.Write(b[:n]) + if err != nil { + return + } + for { + if readTotalLengthFlag { + //we read 4 bytes of allDataLength + if buf.Len() >= 4 { + err = binary.Read(buf, binary.BigEndian, &length) + if err != nil { + return + } + readTotalLengthFlag = false //now turn to read data + } else { + break //wait bytes we not got + } + } + if !readTotalLengthFlag { + if buf.Len() < int(length) { + // judge all data received.if not,loop to wait + break } } + //now all data received, we can read totalLen again + readTotalLengthFlag = true + + //get the data,and handler it + //header len + err = binary.Read(buf, binary.BigEndian, &headerLength) + var realHeaderLen = (headerLength & 0x00ffffff) + //headerData the first ff is about serializable type + var headerSerializableType = byte(headerLength >> 24) + header = make([]byte, realHeaderLen) + _, err = buf.Read(header) + bodyLength = length - 4 - realHeaderLen + body = make([]byte, int(bodyLength)) + if bodyLength == 0 { + // no body + } else { + _, err = buf.Read(body) + } + go self.handlerReceivedMessage(conn, headerSerializableType, header, body) } } - - if update { - rc.namesrvAddrList = addrs // TODO safe? - } - } - -func (rc *RemotingClient) getAndCreateConn(addr string) net.Conn { - return nil -} - -func (rc *RemotingClient) getAndCreateNamesrvConn() net.Conn { - return nil -} - -func (rc *RemotingClient) createConn(addr string) net.Conn { - - return nil +func (self *DefalutRemotingClient) handlerReceivedMessage(conn net.Conn, headerSerializableType byte, headBytes []byte, bodyBytes []byte) { + cmd := self.serializerHandler.DecodeRemoteCommand(headerSerializableType, headBytes, bodyBytes) + if cmd.IsResponseType() { + self.handlerResponse(cmd) + return + } + go self.handlerRequest(conn, cmd) } - -func (rc *RemotingClient) RegisterProcessor(requestCode int, processor *NetRequestProcessor, executor ExecutorService) { - // TODO +func (self *DefalutRemotingClient) handlerRequest(conn net.Conn, cmd *RemotingCommand) { + responseCommand := self.clientRequestProcessor(cmd) + if responseCommand == nil { + return + } + responseCommand.Opaque = cmd.Opaque + responseCommand.MarkResponseType() + header := self.serializerHandler.EncodeHeader(responseCommand) + body := responseCommand.Body + err := self.sendRequest(header, body, conn, "") + if err != nil { + glog.Error(err) + } } +func (self *DefalutRemotingClient) handlerResponse(cmd *RemotingCommand) { + response, err := self.getResponse(cmd.Opaque) + self.removeResponse(cmd.Opaque) + if err != nil { + return + } + response.ResponseCommand = cmd + if response.InvokeCallback != nil { + response.InvokeCallback(response) + } -func (rc *RemotingClient) String() string { - return nil // TODO + if response.Done != nil { + response.Done <- true + } } -func contains(s []string, o string) bool { // TODO optimize - for _, v := range s { - if o == v { - return true +func (self *DefalutRemotingClient) ClearExpireResponse() { + for seq, responseObj := range self.responseTable.Items() { + response := responseObj.(*ResponseFuture) + if (response.BeginTimestamp + 30) <= time.Now().Unix() { + //30 mins expire + self.responseTable.Remove(seq) + if response.InvokeCallback != nil { + response.InvokeCallback(nil) + glog.Warningf("remove time out request %v", response) + } } } - return false } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/remoting_command.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/remoting_command.go b/rocketmq-go/remoting/remoting_command.go index b57f1e9..a8361bd 100644 --- a/rocketmq-go/remoting/remoting_command.go +++ b/rocketmq-go/remoting/remoting_command.go @@ -14,168 +14,56 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package remoting -// TODO: refactor import ( - "bytes" - "encoding/binary" - "encoding/json" - "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header" - "log" - "os" - "strconv" - "sync" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util/structs" "sync/atomic" ) -func init() { - // TODO -} - -const ( - SerializeTypeProperty = "rocketmq.serialize.type" - SerializeTypeEnv = "ROCKETMQ_SERIALIZE_TYPE" - RemotingVersionKey = "rocketmq.remoting.version" - rpcType = 0 // 0, request command - rpcOneWay = 1 // 0, RPC -) - -type RemotingCommandType int +var opaque int32 -const ( - ResponseCommand RemotingCommandType = iota - RqeusetCommand -) +var RPC_TYPE int = 0 // 0, REQUEST_COMMAND +var RPC_ONEWAY int = 1 // 0, RPC -var configVersion int = -1 -var requestId int32 -var decodeLock sync.Mutex +//var RESPONSE_TYPE int= 1 << RPC_TYPE +var RESPONSE_TYPE int = 1 type RemotingCommand struct { //header - code int `json:"code"` - language string `json:"language"` - version int `json:"version"` - opaque int32 `json:"opaque"` - flag int `json:"flag"` - remark string `json:"remark"` - extFields map[string]string `json:"extFields"` - header CustomerHeader // transient - //body - body []byte `json:"body,omitempty"` + Code int16 `json:"code"` + Language string `json:"language"` //int 8 + Version int16 `json:"version"` + Opaque int32 `json:"opaque"` + Flag int `json:"flag"` + Remark string `json:"remark"` + ExtFields map[string]interface{} `json:"extFields"` //java's ExtFields and customHeader is use this key word + Body []byte `json:"body,omitempty"` } -func NewRemotingCommand(code int, header CustomerHeader) *RemotingCommand { - cmd := &RemotingCommand{ - code: code, - header: header, - } - setCmdVersion(cmd) - return cmd +func NewRemotingCommand(commandCode int16, customerHeader CustomerHeader) *RemotingCommand { + return NewRemotingCommandWithBody(commandCode, customerHeader, nil) } -func setCmdVersion(cmd *RemotingCommand) { - if configVersion >= 0 { - cmd.version = configVersion // safety - } else if v := os.Getenv(RemotingVersionKey); v != "" { - value, err := strconv.Atoi(v) - if err != nil { - // TODO log - } - cmd.version = value - configVersion = value +func NewRemotingCommandWithBody(commandCode int16, customerHeader CustomerHeader, body []byte) *RemotingCommand { + remotingCommand := new(RemotingCommand) + remotingCommand.Code = commandCode + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand.Opaque = currOpaque + remotingCommand.Flag = constant.REMOTING_COMMAND_FLAG + remotingCommand.Language = constant.REMOTING_COMMAND_LANGUAGE + remotingCommand.Version = constant.REMOTING_COMMAND_VERSION + if customerHeader != nil { + remotingCommand.ExtFields = structs.Map(customerHeader) } + remotingCommand.Body = body + return remotingCommand } -func (cmd *RemotingCommand) encodeHeader() []byte { - length := 4 - headerData := cmd.buildHeader() - length += len(headerData) - - if cmd.body != nil { - length += len(cmd.body) - } - - buf := bytes.NewBuffer([]byte{}) - binary.Write(buf, binary.BigEndian, length) - binary.Write(buf, binary.BigEndian, len(cmd.body)) - buf.Write(headerData) - - return buf.Bytes() +func (self *RemotingCommand) IsResponseType() bool { + return self.Flag&(RESPONSE_TYPE) == RESPONSE_TYPE } - -func (cmd *RemotingCommand) buildHeader() []byte { - buf, err := json.Marshal(cmd) - if err != nil { - return nil - } - return buf -} - -func (cmd *RemotingCommand) encode() []byte { - length := 4 - - headerData := cmd.buildHeader() - length += len(headerData) - - if cmd.body != nil { - length += len(cmd.body) - } - - buf := bytes.NewBuffer([]byte{}) - binary.Write(buf, binary.LittleEndian, length) - binary.Write(buf, binary.LittleEndian, len(cmd.body)) - buf.Write(headerData) - - if cmd.body != nil { - buf.Write(cmd.body) - } - - return buf.Bytes() -} - -func decodeRemoteCommand(header, body []byte) *RemotingCommand { - decodeLock.Lock() - defer decodeLock.Unlock() - - cmd := &RemotingCommand{} - cmd.extFields = make(map[string]string) - err := json.Unmarshal(header, cmd) - if err != nil { - log.Print(err) - return nil - } - cmd.body = body - return cmd -} - -func CreateRemotingCommand(code int, requestHeader *header.SendMessageRequestHeader) *RemotingCommand { - cmd := &RemotingCommand{} - cmd.code = code - cmd.header = requestHeader - cmd.version = 1 - cmd.opaque = atomic.AddInt32(&requestId, 1) // TODO: safety? - return cmd -} - -func (cmd *RemotingCommand) SetBody(body []byte) { - cmd.body = body -} - -func (cmd *RemotingCommand) Type() RemotingCommandType { - bits := 1 << rpcType - if (cmd.flag & bits) == bits { - return ResponseCommand - } - return RqeusetCommand -} - -func (cmd *RemotingCommand) MarkOneWayRpc() { - cmd.flag |= (1 << rpcOneWay) -} - -func (cmd *RemotingCommand) String() string { - return nil // TODO +func (self *RemotingCommand) MarkResponseType() { + self.Flag = (self.Flag | RESPONSE_TYPE) } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/request_code.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/request_code.go b/rocketmq-go/remoting/request_code.go new file mode 100644 index 0000000..52965d5 --- /dev/null +++ b/rocketmq-go/remoting/request_code.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +const ( + SEND_MESSAGE = 10 + PULL_MESSAGE = 11 + QUERY_MESSAGE = 12 + QUERY_BROKER_OFFSET = 13 + QUERY_CONSUMER_OFFSET = 14 + UPDATE_CONSUMER_OFFSET = 15 + UPDATE_AND_CREATE_TOPIC = 17 + GET_ALL_TOPIC_CONFIG = 21 + GET_TOPIC_CONFIG_LIST = 22 + GET_TOPIC_NAME_LIST = 23 + UPDATE_BROKER_CONFIG = 25 + GET_BROKER_CONFIG = 26 + TRIGGER_DELETE_FILES = 27 + GET_BROKER_RUNTIME_INFO = 28 + SEARCH_OFFSET_BY_TIMESTAMP = 29 + GET_MAX_OFFSET = 30 + GET_MIN_OFFSET = 31 + GET_EARLIEST_MSG_STORETIME = 32 + VIEW_MESSAGE_BY_ID = 33 + HEART_BEAT = 34 + UNREGISTER_CLIENT = 35 + CONSUMER_SEND_MSG_BACK = 36 + END_TRANSACTION = 37 + GET_CONSUMER_LIST_BY_GROUP = 38 + CHECK_TRANSACTION_STATE = 39 + NOTIFY_CONSUMER_IDS_CHANGED = 40 + LOCK_BATCH_MQ = 41 + UNLOCK_BATCH_MQ = 42 + GET_ALL_CONSUMER_OFFSET = 43 + GET_ALL_DELAY_OFFSET = 45 + PUT_KV_CONFIG = 100 + GET_KV_CONFIG = 101 + DELETE_KV_CONFIG = 102 + REGISTER_BROKER = 103 + UNREGISTER_BROKER = 104 + GET_ROUTEINTO_BY_TOPIC = 105 + GET_BROKER_CLUSTER_INFO = 106 + UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200 + GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201 + GET_TOPIC_STATS_INFO = 202 + GET_CONSUMER_CONNECTION_LIST = 203 + GET_PRODUCER_CONNECTION_LIST = 204 + WIPE_WRITE_PERM_OF_BROKER = 205 + + GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206 + DELETE_SUBSCRIPTIONGROUP = 207 + GET_CONSUME_STATS = 208 + SUSPEND_CONSUMER = 209 + RESUME_CONSUMER = 210 + RESET_CONSUMER_OFFSET_IN_CONSUMER = 211 + RESET_CONSUMER_OFFSET_IN_BROKER = 212 + ADJUST_CONSUMER_THREAD_POOL = 213 + WHO_CONSUME_THE_MESSAGE = 214 + + DELETE_TOPIC_IN_BROKER = 215 + DELETE_TOPIC_IN_NAMESRV = 216 + GET_KV_CONFIG_BY_VALUE = 217 + DELETE_KV_CONFIG_BY_VALUE = 218 + GET_KVLIST_BY_NAMESPACE = 219 + + RESET_CONSUMER_CLIENT_OFFSET = 220 + GET_CONSUMER_STATUS_FROM_CLIENT = 221 + INVOKE_BROKER_TO_RESET_OFFSET = 222 + INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223 + + QUERY_TOPIC_CONSUME_BY_WHO = 300 + + GET_TOPICS_BY_CLUSTER = 224 + + REGISTER_FILTER_SERVER = 301 + REGISTER_MESSAGE_FILTER_CLASS = 302 + QUERY_CONSUME_TIME_SPAN = 303 + GET_SYSTEM_TOPIC_LIST_FROM_NS = 304 + GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305 + + CLEAN_EXPIRED_CONSUMEQUEUE = 306 + + GET_CONSUMER_RUNNING_INFO = 307 + + QUERY_CORRECTION_OFFSET = 308 + + CONSUME_MESSAGE_DIRECTLY = 309 + + SEND_MESSAGE_V2 = 310 + + GET_UNIT_TOPIC_LIST = 311 + GET_HAS_UNIT_SUB_TOPIC_LIST = 312 + GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313 + CLONE_GROUP_OFFSET = 314 + + VIEW_BROKER_STATS_DATA = 315 +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/request_processor.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/request_processor.go b/rocketmq-go/remoting/request_processor.go new file mode 100644 index 0000000..eec8cd8 --- /dev/null +++ b/rocketmq-go/remoting/request_processor.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +type ClientRequestProcessor func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand) + +//CHECK_TRANSACTION_STATE +//NOTIFY_CONSUMER_IDS_CHANGED +//RESET_CONSUMER_CLIENT_OFFSET +//GET_CONSUMER_STATUS_FROM_CLIENT +//GET_CONSUMER_RUNNING_INFO +//CONSUME_MESSAGE_DIRECTLY http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/response_code.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/response_code.go b/rocketmq-go/remoting/response_code.go new file mode 100644 index 0000000..6a49c77 --- /dev/null +++ b/rocketmq-go/remoting/response_code.go @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +const ( + SUCCESS = 0 + SYSTEM_ERROR = 1 + SYSTEM_BUSY = 2 + REQUEST_CODE_NOT_SUPPORTED = 3 + TRANSACTION_FAILED = 4 + FLUSH_DISK_TIMEOUT = 10 + SLAVE_NOT_AVAILABLE = 11 + FLUSH_SLAVE_TIMEOUT = 12 + MESSAGE_ILLEGAL = 13 + SERVICE_NOT_AVAILABLE = 14 + VERSION_NOT_SUPPORTED = 15 + NO_PERMISSION = 16 + TOPIC_NOT_EXIST = 17 + TOPIC_EXIST_ALREADY = 18 + PULL_NOT_FOUND = 19 + PULL_RETRY_IMMEDIATELY = 20 + PULL_OFFSET_MOVED = 21 + QUERY_NOT_FOUND = 22 + SUBSCRIPTION_PARSE_FAILED = 23 + SUBSCRIPTION_NOT_EXIST = 24 + SUBSCRIPTION_NOT_LATEST = 25 + SUBSCRIPTION_GROUP_NOT_EXIST = 26 + TRANSACTION_SHOULD_COMMIT = 200 + TRANSACTION_SHOULD_ROLLBACK = 201 + TRANSACTION_STATE_UNKNOW = 202 + TRANSACTION_STATE_GROUP_WRONG = 203 + NO_BUYER_ID = 204 + + NOT_IN_CURRENT_UNIT = 205 + + CONSUMER_NOT_ONLINE = 206 + + CONSUME_MSG_TIMEOUT = 207 +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/response_future.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/response_future.go b/rocketmq-go/remoting/response_future.go index a8c2aec..1bece6d 100644 --- a/rocketmq-go/remoting/response_future.go +++ b/rocketmq-go/remoting/response_future.go @@ -16,57 +16,14 @@ */ package remoting -import ( - "sync" - "time" -) - -type InvokeCallback func(responseFuture *ResponseFuture) - type ResponseFuture struct { - opaque int32 - timeoutMillis time.Duration - invokeCallback InvokeCallback - beginTimestamp int64 - responseCommand *RemotingCommand - sendRequestOK bool - done chan bool - latch sync.WaitGroup - err error -} - -func NewResponseFuture(opaque int32, timeout time.Duration, callback InvokeCallback) *ResponseFuture { - future := &ResponseFuture{ - opaque: opaque, - timeoutMillis: timeout, - invokeCallback: callback, - latch: sync.WaitGroup{}, - } - future.latch.Add(1) - return future -} -func (future *ResponseFuture) SetResponseFuture(cmd *RemotingCommand) { - future.responseCommand = cmd -} - -func (future *ResponseFuture) Done() { - future.latch.Done() - future.done <- true -} - -func (future *ResponseFuture) executeInvokeCallback() { - future.invokeCallback(nil) // TODO -} - -func (future *ResponseFuture) WaitResponse(timeout time.Duration) *RemotingCommand { - go func() { // TODO optimize - time.Sleep(timeout) - future.latch.Add(-1) // TODO whats happened when counter less than 0 - }() - future.latch.Wait() - return future.responseCommand -} - -func (future *ResponseFuture) String() string { - return nil + ResponseCommand *RemotingCommand + SendRequestOK bool + Rrr error + Opaque int32 + TimeoutMillis int64 + InvokeCallback InvokeCallback + BeginTimestamp int64 + Done chan bool } +type InvokeCallback func(responseFuture *ResponseFuture) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/rocketmq_serializable.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/rocketmq_serializable.go b/rocketmq-go/remoting/rocketmq_serializable.go new file mode 100644 index 0000000..22dc41f --- /dev/null +++ b/rocketmq-go/remoting/rocketmq_serializable.go @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" +) + +type RocketMqSerializer struct { +} + +func (self *RocketMqSerializer) EncodeHeaderData(cmd *RemotingCommand) []byte { + var ( + remarkBytes []byte + remarkBytesLen int + extFieldsBytes []byte + extFieldsBytesLen int + ) + remarkBytesLen = 0 + if len(cmd.Remark) > 0 { + remarkBytes = []byte(cmd.Remark) + remarkBytesLen = len(remarkBytes) + } + if cmd.ExtFields != nil { + extFieldsBytes = rocketMqCustomHeaderSerialize(cmd.ExtFields) + extFieldsBytesLen = len(extFieldsBytes) + } + buf := bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.BigEndian, int16(cmd.Code)) //code(~32767) 2 + binary.Write(buf, binary.BigEndian, int8(0)) //JAVA + binary.Write(buf, binary.BigEndian, int16(cmd.Version)) //2 + binary.Write(buf, binary.BigEndian, int32(cmd.Opaque)) //opaque 4 + binary.Write(buf, binary.BigEndian, int32(cmd.Flag)) //4 + binary.Write(buf, binary.BigEndian, int32(remarkBytesLen)) //4 + if remarkBytesLen > 0 { + buf.Write(remarkBytes) + } + binary.Write(buf, binary.BigEndian, int32(extFieldsBytesLen)) //4 + if extFieldsBytesLen > 0 { + buf.Write(extFieldsBytes) + } + fmt.Println(buf.Bytes()) + return buf.Bytes() +} + +func (self *RocketMqSerializer) DecodeRemoteCommand(headerArray, body []byte) (cmd *RemotingCommand) { + cmd = &RemotingCommand{} + buf := bytes.NewBuffer(headerArray) + // int code(~32767) + binary.Read(buf, binary.BigEndian, &cmd.Code) + // LanguageCode language + var LanguageCodeNope byte + binary.Read(buf, binary.BigEndian, &LanguageCodeNope) + cmd.Language = constant.REMOTING_COMMAND_LANGUAGE //todo use code from remote + // int version(~32767) + binary.Read(buf, binary.BigEndian, &cmd.Version) + // int opaque + binary.Read(buf, binary.BigEndian, &cmd.Opaque) + // int flag + binary.Read(buf, binary.BigEndian, &cmd.Flag) + // String remark + var remarkLen, extFieldsLen int32 + binary.Read(buf, binary.BigEndian, &remarkLen) + if remarkLen > 0 { + var remarkData = make([]byte, remarkLen) + binary.Read(buf, binary.BigEndian, &remarkData) + cmd.Remark = string(remarkData) + } + //map ext + // HashMap<String, String> extFields + binary.Read(buf, binary.BigEndian, &extFieldsLen) + if extFieldsLen > 0 { + var extFieldsData = make([]byte, extFieldsLen) + binary.Read(buf, binary.BigEndian, &extFieldsData) + extFiledMap := customHeaderDeserialize(extFieldsData) + cmd.ExtFields = extFiledMap + } + cmd.Body = body + return +} + +func rocketMqCustomHeaderSerialize(extFiled map[string]interface{}) (byteData []byte) { + buf := bytes.NewBuffer([]byte{}) + for key, value := range extFiled { + keyBytes := []byte(fmt.Sprintf("%v", key)) + valueBytes := []byte(fmt.Sprintf("%v", value)) + binary.Write(buf, binary.BigEndian, int16(len(keyBytes))) + buf.Write(keyBytes) + binary.Write(buf, binary.BigEndian, int32(len(valueBytes))) + buf.Write(valueBytes) + } + byteData = buf.Bytes() + return +} + +func customHeaderDeserialize(extFiledDataBytes []byte) (extFiledMap map[string]interface{}) { + extFiledMap = make(map[string]interface{}) + buf := bytes.NewBuffer(extFiledDataBytes) + for buf.Len() > 0 { + var key = getItemFormExtFiledDataBytes(buf, "key") + var value = getItemFormExtFiledDataBytes(buf, "value") + extFiledMap[key] = value + } + return +} +func getItemFormExtFiledDataBytes(buff *bytes.Buffer, itemType string) (item string) { + if itemType == "key" { + var len int16 + binary.Read(buff, binary.BigEndian, &len) + var data = make([]byte, len) + binary.Read(buff, binary.BigEndian, &data) + item = string(data) + } + if itemType == "value" { + var len int32 + binary.Read(buff, binary.BigEndian, &len) + var data = make([]byte, len) + binary.Read(buff, binary.BigEndian, &data) + item = string(data) + } + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/serializable.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/serializable.go b/rocketmq-go/remoting/serializable.go new file mode 100644 index 0000000..24420cd --- /dev/null +++ b/rocketmq-go/remoting/serializable.go @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +import ( + "bytes" + "encoding/binary" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "github.com/golang/glog" +) + +type SerializerHandler struct { + serializer Serializer //which serializer this client use, depend on constant.USE_HEADER_SERIALIZETYPE +} + +type Serializer interface { + EncodeHeaderData(request *RemotingCommand) []byte + DecodeRemoteCommand(header, body []byte) *RemotingCommand +} + +var JSON_SERIALIZER = &JsonSerializer{} +var ROCKETMQ_SERIALIZER = &RocketMqSerializer{} + +func NewSerializerHandler() SerializerHandler { + serializerHandler := SerializerHandler{} + switch constant.USE_HEADER_SERIALIZETYPE { + case constant.JSON_SERIALIZE: + serializerHandler.serializer = JSON_SERIALIZER + break + + case constant.ROCKETMQ_SERIALIZE: + serializerHandler.serializer = ROCKETMQ_SERIALIZER + break + default: + panic("illeage serializer type") + } + return serializerHandler +} +func (self *SerializerHandler) EncodeHeader(request *RemotingCommand) []byte { + length := 4 + headerData := self.serializer.EncodeHeaderData(request) + length += len(headerData) + if request.Body != nil { + length += len(request.Body) + } + buf := bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.BigEndian, int32(length)) // len + binary.Write(buf, binary.BigEndian, int32(len(headerData)|(int(constant.USE_HEADER_SERIALIZETYPE)<<24))) // header len + buf.Write(headerData) + return buf.Bytes() +} + +func (self *SerializerHandler) DecodeRemoteCommand(headerSerializableType byte, header, body []byte) *RemotingCommand { + var serializer Serializer + switch headerSerializableType { + case constant.JSON_SERIALIZE: + serializer = JSON_SERIALIZER + break + case constant.ROCKETMQ_SERIALIZE: + serializer = ROCKETMQ_SERIALIZER + break + default: + glog.Error("Unknow headerSerializableType", headerSerializableType) + } + return serializer.DecodeRemoteCommand(header, body) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/client_api.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/client_api.go b/rocketmq-go/service/client_api.go index ff750ba..6901f4c 100644 --- a/rocketmq-go/service/client_api.go +++ b/rocketmq-go/service/client_api.go @@ -20,7 +20,6 @@ package service import ( "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model" "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" - "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header" "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting" ) @@ -45,25 +44,25 @@ type MQClientAPI struct { config *config.ClientConfig } -func NewMQClientAPI(cfg *config.ClientConfig, processor *ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI { - api := &MQClientAPI{ - remotingClient: &remoting.RemotingClient{}, //TODO - topAddress: &TopAddress{}, // TODO - crp: processor, - config: cfg, - } - - // TODO register - return api -} - -func (api *MQClientAPI) SendMessage(addr, brokerName string, - msg message.Message, requestHeader header.SendMessageRequestHeader, timeout int64) *model.SendResult { - var request *remoting.RemotingCommand - request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader) - request.SetBody(msg.Body) - return api.sendMessageSync(addr, brokerName, msg, timeout, request) -} +//func NewMQClientAPI(cfg *config.ClientConfig, processor *ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI { +// api := &MQClientAPI{ +// remotingClient: &remoting.RemotingClient{}, //TODO +// topAddress: &TopAddress{}, // TODO +// crp: processor, +// config: cfg, +// } +// +// // TODO register +// return api +//} +// +//func (api *MQClientAPI) SendMessage(addr, brokerName string, +// msg message.Message, requestHeader header.SendMessageRequestHeader, timeout int64) *model.SendResult { +// var request *remoting.RemotingCommand +// request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader) +// request.SetBody(msg.Body) +// return api.sendMessageSync(addr, brokerName, msg, timeout, request) +//} func (api *MQClientAPI) sendMessageSync(addr, brokerName string, msg message.Message, http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/consume_message_service.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/consume_message_service.go b/rocketmq-go/service/consume_message_service.go new file mode 100644 index 0000000..09be61c --- /dev/null +++ b/rocketmq-go/service/consume_message_service.go @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package service + +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "github.com/golang/glog" + "time" +) + +type ConsumeMessageService interface { + //ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName); + + Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) + SubmitConsumeRequest(msgs []model.MessageExt, processQueue *model.ProcessQueue, + messageQueue *model.MessageQueue, dispathToConsume bool) + SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) + ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error) +} + +type ConsumeMessageConcurrentlyServiceImpl struct { + consumerGroup string + messageListener model.MessageListener + //sendMessageBackProducerService SendMessageBackProducerService //for send retry Message + offsetStore OffsetStore + consumerConfig *config.RocketMqConsumerConfig +} + +func NewConsumeMessageConcurrentlyServiceImpl(messageListener model.MessageListener) (consumeService ConsumeMessageService) { + //consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener:messageListener, sendMessageBackProducerService:&SendMessageBackProducerServiceImpl{}} + return +} + +func (self *ConsumeMessageConcurrentlyServiceImpl) Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) { + self.consumerGroup = consumerGroup + self.offsetStore = offsetStore + //self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient,defaultProducerService,consumerConfig) + self.consumerConfig = consumerConfig +} + +func (self *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs []model.MessageExt, processQueue *model.ProcessQueue, messageQueue *model.MessageQueue, dispathToConsume bool) { + msgsLen := len(msgs) + for i := 0; i < msgsLen; { + begin := i + end := i + self.consumerConfig.ConsumeMessageBatchMaxSize + if end > msgsLen { + end = msgsLen + } + go func() { + glog.V(2).Infof("look slice begin %d end %d msgsLen %d", begin, end, msgsLen) + batchMsgs := transformMessageToConsume(self.consumerGroup, msgs[begin:end]) + consumeState := self.messageListener(batchMsgs) + self.processConsumeResult(consumeState, batchMsgs, messageQueue, processQueue) + }() + i = end + } + return +} + +func (self *ConsumeMessageConcurrentlyServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) { + //err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName) + return +} + +func (self *ConsumeMessageConcurrentlyServiceImpl) ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error) { + start := time.Now().UnixNano() / 1000000 + consumeResult := self.messageListener([]model.MessageExt{*messageExt}) + consumeMessageDirectlyResult.AutoCommit = true + consumeMessageDirectlyResult.Order = false + consumeMessageDirectlyResult.SpentTimeMills = time.Now().UnixNano()/1000000 - start + if consumeResult.ConsumeConcurrentlyStatus == "CONSUME_SUCCESS" && consumeResult.AckIndex >= 0 { + consumeMessageDirectlyResult.ConsumeResult = "CR_SUCCESS" + + } else { + consumeMessageDirectlyResult.ConsumeResult = "CR_THROW_EXCEPTION" + } + return +} + +func (self *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result model.ConsumeConcurrentlyResult, msgs []model.MessageExt, messageQueue *model.MessageQueue, processQueue *model.ProcessQueue) { + if processQueue.IsDropped() { + glog.Warning("processQueue is dropped without process consume result. ", msgs) + return + } + if len(msgs) == 0 { + return + } + ackIndex := result.AckIndex + if model.CONSUME_SUCCESS == result.ConsumeConcurrentlyStatus { + if ackIndex >= len(msgs) { + ackIndex = len(msgs) - 1 + } else { + if result.AckIndex < 0 { + ackIndex = -1 + } + } + } + var failedMessages []model.MessageExt + successMessages := []model.MessageExt{} + if ackIndex >= 0 { + successMessages = msgs[:ackIndex+1] + } + for i := ackIndex + 1; i < len(msgs); i++ { + err := self.SendMessageBack(&msgs[i], 0, messageQueue.BrokerName) + if err != nil { + msgs[i].ReconsumeTimes = msgs[i].ReconsumeTimes + 1 + failedMessages = append(failedMessages, msgs[i]) + } else { + successMessages = append(successMessages, msgs[i]) + } + } + if len(failedMessages) > 0 { + self.SubmitConsumeRequest(failedMessages, processQueue, messageQueue, true) + } + //commitOffset := processQueue.RemoveMessage(successMessages) + //if (commitOffset > 0 && ! processQueue.IsDropped()) { + // self.offsetStore.UpdateOffset(messageQueue, commitOffset, true) + //} + +} + +func transformMessageToConsume(consumerGroup string, msgs []model.MessageExt) []model.MessageExt { + retryTopicName := constant.RETRY_GROUP_TOPIC_PREFIX + consumerGroup + + for _, msg := range msgs { + //reset retry topic name + if msg.Message.Topic == retryTopicName { + retryTopic := msg.Properties[constant.PROPERTY_RETRY_TOPIC] + if len(retryTopic) > 0 { + msg.Message.Topic = retryTopic + } + } + //set consume start time + msg.SetConsumeStartTime() + } + return msgs +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/consume_messsage_service.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/consume_messsage_service.go b/rocketmq-go/service/consume_messsage_service.go deleted file mode 100644 index d3b28fc..0000000 --- a/rocketmq-go/service/consume_messsage_service.go +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package service - -type ConsumeMessageService struct { -}
