Repository: incubator-rocketmq-externals Updated Branches: refs/heads/master 8ca160f94 -> 42fc22353
[ROCKETMQ-182] RocketMQ Go SDK implementation, closes apache/incubator-rocketmq-externals#13 Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/42fc2235 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/42fc2235 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/42fc2235 Branch: refs/heads/master Commit: 42fc2235371873acf1c09c7f49df9d929865c5c9 Parents: 8ca160f Author: wenfengwang <[email protected]> Authored: Thu Apr 27 20:43:15 2017 +0800 Committer: yukon <[email protected]> Committed: Thu Apr 27 20:43:15 2017 +0800 ---------------------------------------------------------------------- rocketmq-go/.gitignore | 1 + rocketmq-go/README.md | 4 + rocketmq-go/example/consumer_example.go | 2 +- rocketmq-go/example/producer_example.go | 2 +- rocketmq-go/model/config/client_config.go | 169 +++++++++++ rocketmq-go/model/config/consumer_config.go | 1 - rocketmq-go/model/config/producer_config.go | 2 +- rocketmq-go/model/config/rocketmq_config.go | 3 +- rocketmq-go/model/constant/message_constant.go | 17 -- .../model/header/pull_message_request_header.go | 4 +- .../model/header/send_message_request_header.go | 37 +++ .../header/send_message_response_header.go | 2 +- rocketmq-go/model/message/message.go | 264 ++++++++++++++++ rocketmq-go/model/message/message_constant.go | 98 ++++++ rocketmq-go/model/message/message_queue.go | 84 ++++++ rocketmq-go/model/pull_result.go | 79 +++++ rocketmq-go/model/query_result.go | 48 +++ rocketmq-go/model/request_code.go | 181 +++++++++++ rocketmq-go/model/response_code.go | 67 ++++ rocketmq-go/model/send_result.go | 105 +++++++ rocketmq-go/model/topic_publishInfo.go | 76 +++++ rocketmq-go/model/topic_route_data.go | 125 ++++++++ rocketmq-go/mq_client_manager.go | 6 +- rocketmq-go/mq_consumer.go | 9 +- rocketmq-go/mq_producer.go | 7 +- rocketmq-go/remoting/communication_mode.go | 26 ++ rocketmq-go/remoting/custom_header.go | 1 + rocketmq-go/remoting/event_executor.go | 144 +++++++++ rocketmq-go/remoting/invoke_callback.go | 20 -- rocketmq-go/remoting/remoting_client.go | 302 ++++++++++++++++++- rocketmq-go/remoting/remoting_command.go | 159 ++++++++++ rocketmq-go/remoting/response_future.go | 72 +++++ rocketmq-go/remoting/rpchook.go | 23 ++ rocketmq-go/service/client_api.go | 86 ++++++ rocketmq-go/service/client_error_code.go | 26 ++ rocketmq-go/service/consume_messsage_service.go | 1 - rocketmq-go/service/mq_client.go | 3 +- rocketmq-go/service/producer_service.go | 12 +- rocketmq-go/service/rebalance_service.go | 2 +- 39 files changed, 2195 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/.gitignore ---------------------------------------------------------------------- diff --git a/rocketmq-go/.gitignore b/rocketmq-go/.gitignore index bc74c0f..37a9a6f 100644 --- a/rocketmq-go/.gitignore +++ b/rocketmq-go/.gitignore @@ -7,3 +7,4 @@ coverage.out tags temp_parser_file y.output +.DS_Store http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/README.md ---------------------------------------------------------------------- diff --git a/rocketmq-go/README.md b/rocketmq-go/README.md new file mode 100644 index 0000000..900e918 --- /dev/null +++ b/rocketmq-go/README.md @@ -0,0 +1,4 @@ +# RocketMQ Go SDK +some code refer to below repos: +* https://github.com/didapinchegit/go_rocket_mq +* https://github.com/sevennt/go_rocket_mq/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/example/consumer_example.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/example/consumer_example.go b/rocketmq-go/example/consumer_example.go index 3595798..5c0044f 100644 --- a/rocketmq-go/example/consumer_example.go +++ b/rocketmq-go/example/consumer_example.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package example +package main func main() { } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/example/producer_example.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/example/producer_example.go b/rocketmq-go/example/producer_example.go index b64cdcd..bda2941 100644 --- a/rocketmq-go/example/producer_example.go +++ b/rocketmq-go/example/producer_example.go @@ -14,4 +14,4 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package example +package main http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/config/client_config.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/config/client_config.go b/rocketmq-go/model/config/client_config.go new file mode 100644 index 0000000..d9ad88c --- /dev/null +++ b/rocketmq-go/model/config/client_config.go @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "bytes" + "time" +) + +// client common config +type ClientConfig struct { + nameServerAddress string + clientIP string + instanceName string + clientCallbackExecutorThreads int // TODO: clientCallbackExecutorThreads + // Pulling topic information interval from the named server + pullNameServerInterval time.Duration // default 30 + // Heartbeat interval in microseconds with message broker + heartbeatBrokerInterval time.Duration // default 30 + // Offset persistent interval for consumer + persistConsumerOffsetInterval time.Duration // default 5 + unitMode bool + unitName string + vipChannelEnabled bool +} + +func NewClientConfig() *ClientConfig { + return &ClientConfig{ + unitMode: false, + pullNameServerInterval: time.Second * 30, + heartbeatBrokerInterval: time.Second * 30, + persistConsumerOffsetInterval: time.Second * 30, + } +} + +func (config *ClientConfig) BuildMQClientId() string { + var buffer bytes.Buffer + buffer.WriteString(config.clientIP) + buffer.WriteString("@") + buffer.WriteString(config.instanceName) + if config.unitName != "" { + buffer.WriteString("@") + buffer.WriteString(config.unitName) + } + return buffer.String() +} + +func (config *ClientConfig) ChangeInstanceNameToPID() { + // TODO +} + +func (config *ClientConfig) ResetClientConfig(cfg *ClientConfig) { + // TODO +} + +func (config *ClientConfig) CloneClientConfig() *ClientConfig { + return &ClientConfig{ + nameServerAddress: config.nameServerAddress, + clientIP: config.clientIP, + instanceName: config.instanceName, + clientCallbackExecutorThreads: config.clientCallbackExecutorThreads, + pullNameServerInterval: config.pullNameServerInterval, + heartbeatBrokerInterval: config.heartbeatBrokerInterval, + persistConsumerOffsetInterval: config.persistConsumerOffsetInterval, + unitMode: config.unitMode, + unitName: config.unitName, + vipChannelEnabled: config.vipChannelEnabled, + } +} + +func (config *ClientConfig) ClientIP() string { + return config.clientIP +} + +func (config *ClientConfig) SetClientIP(s string) { + config.clientIP = s +} + +func (config *ClientConfig) InstanceName() string { + return config.instanceName +} + +func (config *ClientConfig) SetInstanceName(s string) { + config.instanceName = s +} + +func (config *ClientConfig) NameServerAddress() string { + return config.nameServerAddress +} + +func (config *ClientConfig) SetNameServerAddress(s string) { + config.nameServerAddress = s +} + +func (config *ClientConfig) ClientCallbackExecutorThreads() int { + return config.clientCallbackExecutorThreads +} + +func (config *ClientConfig) SetClientCallbackExecutorThreads(threads int) { + config.clientCallbackExecutorThreads = threads +} + +func (config *ClientConfig) PullNameServerInteval() time.Duration { + return config.pullNameServerInterval +} + +func (config *ClientConfig) SetPullNameServerInteval(interval time.Duration) { + config.pullNameServerInterval = interval +} + +func (config *ClientConfig) HeartbeatBrokerInterval() time.Duration { + return config.heartbeatBrokerInterval +} + +func (config *ClientConfig) SetHeartbeatBrokerInterval(interval time.Duration) { + config.heartbeatBrokerInterval = interval +} + +func (config *ClientConfig) PersistConsumerOffsetInterval() time.Duration { + return config.persistConsumerOffsetInterval +} + +func (config *ClientConfig) SetPersistConsumerOffsetInterval(interval time.Duration) { + config.persistConsumerOffsetInterval = interval +} + +func (config *ClientConfig) UnitName() string { + return config.unitName +} + +func (config *ClientConfig) SetUnitName(name string) { + config.unitName = name +} + +func (config *ClientConfig) UnitMode() bool { + return config.unitMode +} + +func (config *ClientConfig) SetUnitMode(mode bool) { + config.unitMode = mode +} + +func (config *ClientConfig) VipChannelEnabled() bool { + return config.vipChannelEnabled +} + +func (config *ClientConfig) SetVipChannelEnabled(enable bool) { + config.vipChannelEnabled = enable +} + +func (config *ClientConfig) String() string { + //TODO + return "" +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/config/consumer_config.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/config/consumer_config.go b/rocketmq-go/model/config/consumer_config.go index 4622dae..a37eaa0 100644 --- a/rocketmq-go/model/config/consumer_config.go +++ b/rocketmq-go/model/config/consumer_config.go @@ -17,5 +17,4 @@ package config type RocketMqConsumerConfig struct { - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/config/producer_config.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/config/producer_config.go b/rocketmq-go/model/config/producer_config.go index b990c3c..ce109fb 100644 --- a/rocketmq-go/model/config/producer_config.go +++ b/rocketmq-go/model/config/producer_config.go @@ -15,6 +15,6 @@ * limitations under the License. */ package config -type RocketMqProducerConfig struct { +type RocketMqProducerConfig struct { } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/config/rocketmq_config.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/config/rocketmq_config.go b/rocketmq-go/model/config/rocketmq_config.go index ac6e89f..56e89b9 100644 --- a/rocketmq-go/model/config/rocketmq_config.go +++ b/rocketmq-go/model/config/rocketmq_config.go @@ -15,7 +15,6 @@ * limitations under the License. */ package config -type RocketMqClientConfig struct { +type RocketMqClientConfig struct { } - http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/constant/message_constant.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/constant/message_constant.go b/rocketmq-go/model/constant/message_constant.go deleted file mode 100644 index 93f23d0..0000000 --- a/rocketmq-go/model/constant/message_constant.go +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package header http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/header/pull_message_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/pull_message_request_header.go b/rocketmq-go/model/header/pull_message_request_header.go index 8d15500..0133796 100644 --- a/rocketmq-go/model/header/pull_message_request_header.go +++ b/rocketmq-go/model/header/pull_message_request_header.go @@ -19,6 +19,6 @@ package header type PullMessageRequestHeader struct { } -func (self *PullMessageRequestHeader) FromMap(headerMap map[string]interface{}) { +func (header *PullMessageRequestHeader) FromMap(headerMap map[string]interface{}) { return -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/header/send_message_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/send_message_request_header.go b/rocketmq-go/model/header/send_message_request_header.go new file mode 100644 index 0000000..5c828a8 --- /dev/null +++ b/rocketmq-go/model/header/send_message_request_header.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +type SendMessageRequestHeader struct { + //CommandCustomHeader + ProducerGroup string + Topic string + DefaultTopic string + DefaultTopicQueueNum int + QueueID int + SysFlag int + BornTimestamp int + Flag int + Properties string + ReconsumeTimes int + UnitMode bool + MaxReconsumeTimes int +} + +func (header *SendMessageRequestHeader) FromMap(headerMap map[string]interface{}) { + //TODO +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/header/send_message_response_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/send_message_response_header.go b/rocketmq-go/model/header/send_message_response_header.go index 8b40cdf..9ddacb2 100644 --- a/rocketmq-go/model/header/send_message_response_header.go +++ b/rocketmq-go/model/header/send_message_response_header.go @@ -24,6 +24,6 @@ type SendMessageResponseHeader struct { MsgRegion string } -func (self *SendMessageResponseHeader) FromMap(headerMap map[string]interface{}) { +func (header *SendMessageResponseHeader) FromMap(headerMap map[string]interface{}) { return } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/message/message.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/message/message.go b/rocketmq-go/model/message/message.go new file mode 100644 index 0000000..633446c --- /dev/null +++ b/rocketmq-go/model/message/message.go @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package message + +import ( + "bytes" + "compress/zlib" + "encoding/binary" + "encoding/json" + "fmt" + "github.com/golang/glog" + "io/ioutil" +) + +const ( + CompressedFlag = 1 << 0 + MultiTagsFlag = 1 << 1 + TransactionNotType = 0 << 2 + TransactionPreparedType = 1 << 2 + TransactionCommitType = 2 << 2 + TransactionRollbackType = 3 << 2 +) + +const ( + NameValueSeparator = 1 + iota + PropertySeparator +) + +const ( + CharacterMaxLength = 255 +) + +type Message struct { + Topic string + Flag int32 + properties map[string]string + Body []byte +} + +func NewDefultMessage(topic string, body []byte) *Message { + return NewMessage(topic, "", "", 0, body, true) +} + +type MessageExt struct { + Message + QueueId int32 + StoreSize int32 + QueueOffset int64 + SysFlag int32 + BornTimestamp int64 + // bornHost + StoreTimestamp int64 + // storeHost + MsgId string + CommitLogOffset int64 + BodyCRC int32 + ReconsumeTimes int32 + PreparedTransactionOffset int64 +} + +func (msg *Message) encodeMessage() []byte { + // TODO + return nil +} + +func decodeMessage(data []byte) []*MessageExt { + buf := bytes.NewBuffer(data) + var storeSize, magicCode, bodyCRC, queueId, flag, sysFlag, reconsumeTimes, bodyLength, bornPort, storePort int32 + var queueOffset, physicOffset, preparedTransactionOffset, bornTimeStamp, storeTimestamp int64 + var topicLen byte + var topic, body, properties, bornHost, storeHost []byte + var propertiesLength int16 + + var propertiesMap map[string]string + + msgs := make([]*MessageExt, 0, 32) + for buf.Len() > 0 { + msg := new(MessageExt) + binary.Read(buf, binary.BigEndian, &storeSize) + binary.Read(buf, binary.BigEndian, &magicCode) + binary.Read(buf, binary.BigEndian, &bodyCRC) + binary.Read(buf, binary.BigEndian, &queueId) + binary.Read(buf, binary.BigEndian, &flag) + binary.Read(buf, binary.BigEndian, &queueOffset) + binary.Read(buf, binary.BigEndian, &physicOffset) + binary.Read(buf, binary.BigEndian, &sysFlag) + binary.Read(buf, binary.BigEndian, &bornTimeStamp) + bornHost = make([]byte, 4) + binary.Read(buf, binary.BigEndian, &bornHost) + binary.Read(buf, binary.BigEndian, &bornPort) + binary.Read(buf, binary.BigEndian, &storeTimestamp) + storeHost = make([]byte, 4) + binary.Read(buf, binary.BigEndian, &storeHost) + binary.Read(buf, binary.BigEndian, &storePort) + binary.Read(buf, binary.BigEndian, &reconsumeTimes) + binary.Read(buf, binary.BigEndian, &preparedTransactionOffset) + binary.Read(buf, binary.BigEndian, &bodyLength) + if bodyLength > 0 { + body = make([]byte, bodyLength) + binary.Read(buf, binary.BigEndian, body) + + if (sysFlag & CompressedFlag) == CompressedFlag { + b := bytes.NewReader(body) + z, err := zlib.NewReader(b) + if err != nil { + fmt.Println(err) + return nil + } + + body, err = ioutil.ReadAll(z) + if err != nil { + fmt.Println(err) + return nil + } + z.Close() + } + + } + binary.Read(buf, binary.BigEndian, &topicLen) + topic = make([]byte, 0) + binary.Read(buf, binary.BigEndian, &topic) + binary.Read(buf, binary.BigEndian, &propertiesLength) + if propertiesLength > 0 { + properties = make([]byte, propertiesLength) + binary.Read(buf, binary.BigEndian, &properties) + propertiesMap = make(map[string]string) + json.Unmarshal(properties, &propertiesMap) + } + + if magicCode != -626843481 { + fmt.Printf("magic code is error %d", magicCode) + return nil + } + + msg.Topic = string(topic) + msg.QueueId = queueId + msg.SysFlag = sysFlag + msg.QueueOffset = queueOffset + msg.BodyCRC = bodyCRC + msg.StoreSize = storeSize + msg.BornTimestamp = bornTimeStamp + msg.ReconsumeTimes = reconsumeTimes + msg.Flag = flag + //msg.commitLogOffset=physicOffset + msg.StoreTimestamp = storeTimestamp + msg.PreparedTransactionOffset = preparedTransactionOffset + msg.Body = body + msg.properties = propertiesMap + + msgs = append(msgs, msg) + } + + return msgs +} + +func messageProperties2String(properties map[string]string) string { + StringBuilder := bytes.NewBuffer([]byte{}) + if properties != nil && len(properties) != 0 { + for k, v := range properties { + binary.Write(StringBuilder, binary.BigEndian, k) // 4 + binary.Write(StringBuilder, binary.BigEndian, NameValueSeparator) // 4 + binary.Write(StringBuilder, binary.BigEndian, v) // 4 + binary.Write(StringBuilder, binary.BigEndian, PropertySeparator) // 4 + } + } + return StringBuilder.String() +} + +//func (msg Message) checkMessage(producer *DefaultProducer) (err error) { +// if err = checkTopic(msg.Topic); err != nil { +// if len(msg.Body) == 0 { +// err = errors.New("ResponseCode:" + strconv.Itoa(MsgIllegal) + ", the message body is null") +// } else if len(msg.Body) > producer.maxMessageSize { +// err = errors.New("ResponseCode:" + strconv.Itoa(MsgIllegal) + ", the message body size over max value, MAX:" + strconv.Itoa(producer.maxMessageSize)) +// } +// } +// return +//} + +//func checkTopic(topic string) (err error) { +// if topic == "" { +// err = errors.New("the specified topic is blank") +// } +// if len(topic) > CharacterMaxLength { +// err = errors.New("the specified topic is longer than topic max length 255") +// } +// if topic == DefaultTopic { +// err = errors.New("the topic[" + topic + "] is conflict with default topic") +// } +// return +//} + +func NewMessage(topic, tags, keys string, flag int32, body []byte, waitStoreMsgOK bool) *Message { + message := &Message{ + Topic: topic, + Flag: flag, + Body: body, + } + + if tags != "" { + message.SetTags(tags) + } + + if keys != "" { + message.SetKeys(keys) + } + + message.SetWaitStoreMsgOK(waitStoreMsgOK) + return message +} + +func (msg *Message) SetTags(t string) { + msg.putProperty(MessageConst.PropertyTags, t) +} + +func (msg *Message) SetKeys(k string) { + msg.putProperty(MessageConst.PropertyKeys, k) +} + +func (msg *Message) SetWaitStoreMsgOK(b bool) { + +} + +func (msg *Message) Property() map[string]string { + return msg.properties +} + +func (msg *Message) putProperty(k, v string) { + if msg.properties == nil { + msg.properties = make(map[string]string) + } + if v, found := msg.properties[k]; !found { + msg.properties[k] = v + } else { + glog.Infof("Message put peoperties key: %s existed.", k) + } +} + +func (msg *Message) removeProperty(k, v string) string { + if v, ok := msg.properties[k]; ok { + delete(msg.properties, k) + return v + } + return nil +} + +func (msg *Message) String() string { + return fmt.Sprintf("Message [topic=%s, flag=%s, properties=%s, body=%s]", + msg.Topic, msg.Flag, msg.properties, msg.Body) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/message/message_constant.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/message/message_constant.go b/rocketmq-go/model/message/message_constant.go new file mode 100644 index 0000000..3991c7e --- /dev/null +++ b/rocketmq-go/model/message/message_constant.go @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package message + +type messageConst struct { + PropertyKeys string + PropertyTags string + PropertyWaitStoreMsgOk string + PropertyDelayTimeLevel string + PropertyRetryTopic string + PropertyRealTopic string + PropertyRealQueueId string + PropertyTransactionPrepared string + PropertyProducerGroup string + PropertyMinOffset string + PropertyMaxOffset string + PropertyBuyerId string + PropertyOriginMessageId string + PropertyTransferFlag string + PropertyCorrectionFlag string + PropertyMq2Flag string + PropertyReconsumeTime string + PropertyMsgRegion string + PropertyUniqClientMessageIdKeyidx string + PropertyMaxReconsumeTimes string + PropertyConsumeStartTimeStamp string + + KeySeparator string + systemKeySet []string +} + +var MessageConst = &messageConst{ + PropertyKeys: "KEYS", + PropertyTags: "TAGS", + PropertyWaitStoreMsgOk: "WAIT", + PropertyDelayTimeLevel: "DELAY", + PropertyRetryTopic: "RETRY_TOPIC", + PropertyRealTopic: "REAL_TOPIC", + PropertyRealQueueId: "REAL_QID", + PropertyTransactionPrepared: "TRAN_MSG", + PropertyProducerGroup: "PGROUP", + PropertyMinOffset: "MIN_OFFSET", + PropertyMaxOffset: "MAX_OFFSET", + PropertyBuyerId: "BUYER_ID", + PropertyOriginMessageId: "ORIGIN_MESSAGE_ID", + PropertyTransferFlag: "TRANSFER_FLAG", + PropertyCorrectionFlag: "CORRECTION_FLAG", + PropertyMq2Flag: "MQ2_FLAG", + PropertyReconsumeTime: "RECONSUME_TIME", + PropertyMsgRegion: "MSG_REGION", + PropertyUniqClientMessageIdKeyidx: "UNIQ_KEY", + PropertyMaxReconsumeTimes: "MAX_RECONSUME_TIMES", + PropertyConsumeStartTimeStamp: "CONSUME_START_TIME", + + KeySeparator: "", +} + +func init() { + var systemKeySet = []string{} + systemKeySet = append(systemKeySet, MessageConst.PropertyKeys) + systemKeySet = append(systemKeySet, MessageConst.PropertyTags) + systemKeySet = append(systemKeySet, MessageConst.PropertyWaitStoreMsgOk) + systemKeySet = append(systemKeySet, MessageConst.PropertyDelayTimeLevel) + systemKeySet = append(systemKeySet, MessageConst.PropertyRetryTopic) + systemKeySet = append(systemKeySet, MessageConst.PropertyRealTopic) + systemKeySet = append(systemKeySet, MessageConst.PropertyRealQueueId) + systemKeySet = append(systemKeySet, MessageConst.PropertyTransactionPrepared) + systemKeySet = append(systemKeySet, MessageConst.PropertyProducerGroup) + systemKeySet = append(systemKeySet, MessageConst.PropertyMinOffset) + systemKeySet = append(systemKeySet, MessageConst.PropertyMaxOffset) + systemKeySet = append(systemKeySet, MessageConst.PropertyBuyerId) + systemKeySet = append(systemKeySet, MessageConst.PropertyOriginMessageId) + systemKeySet = append(systemKeySet, MessageConst.PropertyTransferFlag) + systemKeySet = append(systemKeySet, MessageConst.PropertyCorrectionFlag) + systemKeySet = append(systemKeySet, MessageConst.PropertyMq2Flag) + systemKeySet = append(systemKeySet, MessageConst.PropertyReconsumeTime) + systemKeySet = append(systemKeySet, MessageConst.PropertyMsgRegion) + systemKeySet = append(systemKeySet, MessageConst.PropertyUniqClientMessageIdKeyidx) + systemKeySet = append(systemKeySet, MessageConst.PropertyMaxReconsumeTimes) + systemKeySet = append(systemKeySet, MessageConst.PropertyConsumeStartTimeStamp) + + MessageConst.systemKeySet = systemKeySet +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/message/message_queue.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/message/message_queue.go b/rocketmq-go/model/message/message_queue.go new file mode 100644 index 0000000..20b47be --- /dev/null +++ b/rocketmq-go/model/message/message_queue.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package message + +type MessageQueue struct { + topic string + brokerName string + queueId int32 +} + +func NewMessageQueue(topic string, brokerName string, queueId int32) *MessageQueue { + return &MessageQueue{ + topic: topic, + brokerName: brokerName, + queueId: queueId, + } +} + +func (queue *MessageQueue) clone() *MessageQueue { + no := new(MessageQueue) + no.topic = queue.topic + no.queueId = queue.queueId + no.brokerName = queue.brokerName + return no +} + +func (queue MessageQueue) BrokerName() string { + return queue.brokerName +} + +func (queue *MessageQueue) QueueID() int32 { + return queue.queueId +} + +type MessageQueues []*MessageQueue + +func (queues MessageQueues) Less(i, j int) bool { + imq := queues[i] + jmq := queues[j] + + if imq.topic < jmq.topic { + return true + } + + if imq.topic < jmq.topic { + return false + } + + if imq.brokerName < jmq.brokerName { + return true + } + + if imq.brokerName < jmq.brokerName { + return false + } + + if imq.queueId < jmq.queueId { + return true + } + + return false +} + +func (queues MessageQueues) Swap(i, j int) { + queues[i], queues[j] = queues[j], queues[i] +} + +func (queues MessageQueues) Len() int { + return len(queues) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/pull_result.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/pull_result.go b/rocketmq-go/model/pull_result.go new file mode 100644 index 0000000..b34a2d9 --- /dev/null +++ b/rocketmq-go/model/pull_result.go @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + "fmt" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" +) + +type PullStatus int + +const ( + Found PullStatus = iota + NoNewMsg + NoMatchedMsg + OffsetIllegal +) + +type PullResult struct { + pullStatus PullStatus + nextBeginOffset int64 + minOffset int64 + maxOffset int64 + msgFoundList []*message.MessageExt +} + +func NewPullResult(ps PullStatus, next, min, max int64, list []*message.MessageExt) *PullResult { + return &PullResult{ + ps, + next, + min, + max, + list, + } +} + +func (result *PullResult) PullStatus() PullStatus { + return result.pullStatus +} + +func (result *PullResult) NextBeginOffset() int64 { + return result.nextBeginOffset +} + +func (result *PullResult) MaxOffset() int64 { + return result.maxOffset +} + +func (result *PullResult) MinOffset() int64 { + return result.minOffset +} + +func (result *PullResult) MsgFoundList() []*message.MessageExt { + return result.msgFoundList +} + +func (result *PullResult) SetMsgFoundList(list []*message.MessageExt) { + result.msgFoundList = list +} + +func (result *PullResult) String() string { + return fmt.Sprintf("PullResult [pullStatus=%s, nextBeginOffset=%s, minOffset=%s, maxOffset=%s, msgFoundList=%s]", + result.pullStatus, result.nextBeginOffset, result.minOffset, result.maxOffset, len(result.msgFoundList)) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/query_result.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/query_result.go b/rocketmq-go/model/query_result.go new file mode 100644 index 0000000..b9e9236 --- /dev/null +++ b/rocketmq-go/model/query_result.go @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + "fmt" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" +) + +type QueryResult struct { + indexLastUpdateTimestamp int64 + messageList []*message.MessageExt +} + +func NewQueryResult(timestamp int64, list []*message.MessageExt) *QueryResult { + return &QueryResult{ + indexLastUpdateTimestamp: timestamp, + messageList: list, + } +} + +func (qr *QueryResult) IndexLastUpdateTimestamp() int64 { + return qr.indexLastUpdateTimestamp +} + +func (qr *QueryResult) MessageList() []*message.MessageExt { //TODO: address? + return qr.messageList +} + +func (qr *QueryResult) String() string { + return fmt.Sprintf("QueryResult [indexLastUpdateTimestamp=%s, messageList=%s]", + qr.indexLastUpdateTimestamp, qr.messageList) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/request_code.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/request_code.go b/rocketmq-go/model/request_code.go new file mode 100644 index 0000000..495c1a7 --- /dev/null +++ b/rocketmq-go/model/request_code.go @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http:// www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +const ( + // send message + SendMsg = 10 + // subscribe message + PullMsg = 11 + // query message + QueryMessage = 12 + // queryOffset + QueryBrokerOffset = 13 + // query Consumer Offset + QueryConsumerOffset = 14 + // update Consumer Offset + UpdateConsumerOffset = 15 + // update or increase a topic + UpdateAndCreateTopic = 17 + // get all config of topic (Slave and Namesrv query the config to master) + GetAllTopicConfig = 21 + // get all config (Slave and Namesrv query the config to master) + GetTopicConfigList = 22 + // get all name list of topic + GetTopicNameList = 23 + // update config + UpdateBrokerConfig = 25 + // get config + GetBrokerConfig = 26 + // trigger delete files + TriggerDeleteFILES = 27 + // get runtime information + GetBrokerRuntimeInfo = 28 + // search offset by timestamp + SearchOffsetByTimeStamp = 29 + // query max offset of queue + GetMaxOffset = 30 + // query min offset of queue + GetMinOffset = 31 + // query earliest message store time + GetEarliestMsgStoreTime = 32 + // query message by id + ViewMsgById = 33 + // client send heartbeat to broker, and register self + HeartBeat = 34 + // unregister client + UnregisterClient = 35 + // consumer send message back to broker when can't process message + ConsumerSendMsgBack = 36 + // Commit or Rollback transaction + EndTransaction = 37 + // get ConsumerId list by GroupName + GetConsumerListByGroup = 38 + // ckeck transaction state from producer + CheckTransactionState = 39 + // broker notify consumer ids changed + NotifyConsumerIdsChanged = 40 + // Consumer lock queue to master + LockBatchMq = 41 + // Consumer unlock queue to master + UNLockBatchMq = 42 + // get all consumer offset + GetAllConsumerOffset = 43 + // get all delay offset + GetAllDelayOffset = 45 + // put kv config to Namesrv + PutKVConfig = 100 + // get kv config to Namesrv + GetKVConfig = 101 + // delete kv config to Namesrv + DeleteKVConfig = 102 + // register a broker to Namesrv. As data is persistent, + // the broker will overwrite if old config existed. + RegisterBroker = 103 + // register a broker + UnregisterBroker = 104 + // get broker name, queue numbers by topic. + GetRouteinfoByTopic = 105 + // get all registered broker to namesrv info + GetBrokerClusterInfo = 106 + UpdateAndCreateSubscriptionGroup = 200 + GetAllSubscriptionGroupConfig = 201 + GetTopicStatsInfo = 202 + GetConsumerConnList = 203 + GetProducerConnList = 204 + WipeWritePermOfBroker = 205 + + // get all topic list from namesrv + GetAllTopicListFromNamesrv = 206 + // delete subscription group from broker + DeleteSubscriptionGroup = 207 + // get consume stats from broker + GetConsumeStats = 208 + // Suspend Consumer + SuspendConsumer = 209 + // Resume Consumer + ResumeConsumer = 210 + // reset Consumer Offset + ResetConsumerOffsetInConsumer = 211 + // reset Consumer Offset + ResetConsumerOffsetInBroker = 212 + // query which consumer groups consume the msg + WhoConsumeMessage = 214 + + // namesrv delete topic config from broker + DeleteTopicInBroker = 215 + // namesrv delete topic config from namesrv + DeleteTopicInNamesrv = 216 + // namesrv get server ip info by project + GetKvConfigByValue = 217 + // Namesrv delete all server ip by project group + DeleteKvConfigByValue = 218 + // get all KV list by namespace + GetKvlistByNamespace = 219 + + // reset offset + ResetConsumerClientOffset = 220 + // get consumer status from client + GetConsumerStatusFromClient = 221 + // invoke broker to reset offset + InvokeBrokerToResetOffset = 222 + // invoke broker to get consumer status + InvokeBrokerToGetConsumerStatus = 223 + + // query which consumer consume msg + QueryTopicConsumeByWho = 300 + + // get topics by cluster + GetTopicsByCluster = 224 + + // register filter server to broker + RegisterFilterServer = 301 + // register class to filter server + RegisterMsgFilterClass = 302 + // get time span by topic and group + QueryConsumeTimeSpan = 303 + // get all system topics from namesrv + GetSysTopicListFromNS = 304 + // get all system topics from broker + GetSysTopicListFromBroker = 305 + + // clean expired consume queue + CleanExpiredConsumequeue = 306 + + // query consumer memory data by broker + GetConsumerRunningInfo = 307 + + // TODO: query correction offset(transfer component?) + QueryCorrectionOffset = 308 + + // Send msg to one consumer by broker, The msg will immediately consume, + // and return result to broker, broker return result to caller + ConsumeMsgDirectly = 309 + + // send msg with optimized network datagram + SendMsgV2 = 310 + + // get unit topic list + GetUnitTopicList = 311 + GetHasUnitSubTopicList = 312 + GetHasUnitSubUnunitTopicList = 313 + CloneGroupOffset = 314 + + // query all status that broker count + ViewBrokerStatsData = 315 +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/response_code.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/response_code.go b/rocketmq-go/model/response_code.go new file mode 100644 index 0000000..957c1e9 --- /dev/null +++ b/rocketmq-go/model/response_code.go @@ -0,0 +1,67 @@ +package model + +const ( + // success + Success = 0 + // happened unknow error exception + SystemError = 1 + // system busy + SystemBusy = 2 + // unsupport request code + RequestCodeNotSupported = 3 + // transaction failed, because of add db failed + TransactionFailed = 4 + // Broker flush disk timeout + FlushDiskTimeout = 10 + // Broker slave unavailable, just for sync double write + SlaveNotAvailable = 11 + // Broker write slave timeout, just for sync double write + FlushSlaveTimeout = 12 + // Broker illegal message + MessageIllegal = 13 + // Broker, Namesrv not availableï¼maybe service is closing or incorrect permission + ServiceNotAvailable = 14 + // Broker, Namesrv unsupport version + VersionNOtSupported = 15 + // Broker, Namesrv no permission for operation with send/receive or other + NoPermission = 16 + // Broker, topic not exist + TopicNotExist = 17 + // Broker, topic already exist + TopicExistAlready = 18 + // Broker message not found when pull + PullNotFound = 19 + // Broker retry immediately, maybe msg was filtered or incorrect notification TODO confirm annotation + PullRetryImmediately = 20 + // Broker pull offset moved, because of too big or to small TODO confirm annotation + PullOffsetMoved = 21 + // Broker query not found + QueryNotFound = 22 + // Broker parse subscription failed + SubscriptionParseFailed = 23 + // Broker subscription relationship not existed + SubscriptionNotExist = 24 + // Broker subscription relationship not latest + SubscriptionNotLatest = 25 + // Broker subscription group not exist + SubscriptionGroupNotExist = 26 + // Producer transaction should commit + TransactionShouldCommit = 200 + // Producer transaction should rollback + TransactionShouldRollback = 201 + // Producer transaction status unknow + TransactionStatusUnknow = 202 + // Producer ProducerGroup transaction error + TransactionStatusGroupWrong = 203 + // unit messageï¼need set buyerId + NoBuyerID = 204 + + // unit messageï¼not current unit msg + NotInCurrentUnit = 205 + + // Consumer not online + ConsumerNotOnline = 206 + + // Consumer consume msg timeout + ConsumeMsgTimeout = 207 +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/send_result.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/send_result.go b/rocketmq-go/model/send_result.go index fc18d6f..b1af4fb 100644 --- a/rocketmq-go/model/send_result.go +++ b/rocketmq-go/model/send_result.go @@ -16,5 +16,110 @@ */ package model +import ( + "fmt" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" +) + +type SendStatus int + +const ( + SendOK SendStatus = iota + //FlushDiskTimeout + //FlushSlaveTimeout + SlaveNotAvaliable +) + type SendResult struct { + sendStatus SendStatus + msgID string + messageQueue *message.MessageQueue + queueOffset int64 + transactionID string + offsetMsgID string + regionID string + traceOn bool +} + +func NewSendResult(status SendStatus, msgID, offsetID string, queue *message.MessageQueue, queueOffset int64) *SendResult { + return &SendResult{ + sendStatus: status, + msgID: msgID, + offsetMsgID: offsetID, + messageQueue: queue, + queueOffset: queueOffset, + } +} + +func EncoderSendResultToJson(obj interface{}) string { + return nil // TODO +} + +func DecoderSendResultFromJson(json string) *SendResult { + return nil // TODO +} + +func (result *SendResult) TraceOn() bool { + return result.traceOn +} + +func (result *SendResult) SetTraceOn(b bool) { + result.traceOn = b +} + +func (result *SendResult) SetRegionID(s string) { + result.regionID = s +} + +func (result *SendResult) MsgID() string { + return result.msgID +} + +func (result *SendResult) SetMsgID(s string) { + result.msgID = s +} + +func (result *SendResult) SendStatus() SendStatus { + return result.sendStatus +} + +func (result *SendResult) SetSendStatus(status SendStatus) { + result.sendStatus = status +} + +func (result *SendResult) MessageQueue() *message.MessageQueue { + return result.messageQueue +} + +func (result *SendResult) SetMessageQueue(queue *message.MessageQueue) { + result.messageQueue = queue +} + +func (result *SendResult) QueueOffset() int64 { + return result.queueOffset +} + +func (result *SendResult) SetQueueOffset(offset int64) { + result.queueOffset = offset +} + +func (result *SendResult) TransactionID() string { + return result.transactionID +} + +func (result *SendResult) SetTransactionID(s string) { + result.transactionID = s +} + +func (result *SendResult) OffsetMsgID() string { + return result.offsetMsgID +} + +func (result *SendResult) SetOffsetMsgID(s string) { + result.offsetMsgID = s +} + +func (result *SendResult) String() string { + return fmt.Sprintf("SendResult [sendStatus=%s, msgId=%s, offsetMsgId=%s, messageQueue=%s, queueOffset=%s]", + result.sendStatus, result.msgID, result.offsetMsgID, result.messageQueue, result.queueOffset) } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/topic_publishInfo.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/topic_publishInfo.go b/rocketmq-go/model/topic_publishInfo.go new file mode 100644 index 0000000..b2f711b --- /dev/null +++ b/rocketmq-go/model/topic_publishInfo.go @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" +) + +type TopicPublishInfo struct { + orderTopic bool + havaTopicRouteInfo bool + messageQueueList []*message.MessageQueue + topicRouteData *TopicRouteData +} + +func (info *TopicPublishInfo) SetOrderTopic(b bool) { + info.orderTopic = b +} + +func (info *TopicPublishInfo) Ok() bool { + return false +} + +func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue { + return info.messageQueueList +} + +func (info *TopicPublishInfo) HaveTopicRouteInfo() bool { + return info.havaTopicRouteInfo +} + +func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) { + info.havaTopicRouteInfo = b +} + +func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData { + return info.topicRouteData +} + +func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) { + info.topicRouteData = routeDate +} + +func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue { + return nil //TODO +} + +func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue { + if brokerName == "" { + return info.SelectOneMessageQueue() + } + return nil //TODO +} + +func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int { + return nil //TODO +} + +func (info *TopicPublishInfo) String() string { + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/model/topic_route_data.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/topic_route_data.go b/rocketmq-go/model/topic_route_data.go new file mode 100644 index 0000000..f387529 --- /dev/null +++ b/rocketmq-go/model/topic_route_data.go @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + "fmt" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" +) + +type BrokerData struct { +} + +type TopicRouteData struct { + orderTopicConf string + queueDatas []*message.MessageQueue + brokerDatas []*BrokerData + filterServerTable map[string][]string +} + +func NewTopicRouteData() *TopicRouteData { + return &TopicRouteData{} +} + +func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) { + clonedRouteData = &TopicRouteData{ + route.orderTopicConf, + route.queueDatas, + route.brokerDatas, + route.filterServerTable, + } + // TODO: to complete + return +} + +func (route *TopicRouteData) QueueDatas() []*message.MessageQueue { + return route.queueDatas +} + +func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) { + route.queueDatas = data +} + +func (route *TopicRouteData) BrokerDatas() []*BrokerData { + return route.brokerDatas +} + +func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) { + route.brokerDatas = data +} + +func (route *TopicRouteData) FilterServerTable() map[string][]string { + return route.filterServerTable +} + +func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) { + route.filterServerTable = data +} + +func (route *TopicRouteData) OrderTopicConf() string { + return route.orderTopicConf +} + +func (route *TopicRouteData) SetOrderTopicConf(s string) { + route.orderTopicConf = s +} + +func (route *TopicRouteData) HashCode() (result int) { + prime := 31 + result = 1 + result *= prime + // TODO + + return +} + +func (route *TopicRouteData) Equals(route1 interface{}) bool { + if route == nil { + return true + } + if route1 == nil { + return false + } + //value, ok := route1.(TopicRouteData) + //if !ok { + // return false + //} + // TODO + //if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) { + // return false + //} + // + //if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf { + // return false + //} + // + //if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas { + // return false + //} + // + //if route.filterServerTable == nil && value.filterServerTable != nil || + // route.filterServerTable != value.filterServerTable { + // return false + //} + return true +} + +func (route *TopicRouteData) String() string { + return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]", + route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/mq_client_manager.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/mq_client_manager.go b/rocketmq-go/mq_client_manager.go index 153d955..c07dcfd 100644 --- a/rocketmq-go/mq_client_manager.go +++ b/rocketmq-go/mq_client_manager.go @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rocketmq_go +package rocketmq -import "github.com/incubator-rocketmq-externals/rocketmq-go/service" +import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" type MqClientManager struct { clientFactory *ClientFactory @@ -33,4 +33,4 @@ type ClientFactory struct { type PullMessageController struct { rocketMqClient service.RocketMqClient clientFactory *ClientFactory -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/mq_consumer.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/mq_consumer.go b/rocketmq-go/mq_consumer.go index f143b9a..7712ee1 100644 --- a/rocketmq-go/mq_consumer.go +++ b/rocketmq-go/mq_consumer.go @@ -14,22 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rocketmq_go +package rocketmq -import "github.com/incubator-rocketmq-externals/rocketmq-go/service" +import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" type RocketMQConsumer interface { } type MqConsumerConfig struct { - } type DefaultMQPushConsumer struct { offsetStore service.OffsetStore //for consumer's offset mqClient service.RocketMqClient - rebalance *service.Rebalance //Rebalance's impl depend on offsetStore + rebalance *service.Rebalance //Rebalance's impl depend on offsetStore consumeMessageService service.ConsumeMessageService ConsumerConfig *MqConsumerConfig } - - http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/mq_producer.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/mq_producer.go b/rocketmq-go/mq_producer.go index 2a50344..3677939 100644 --- a/rocketmq-go/mq_producer.go +++ b/rocketmq-go/mq_producer.go @@ -14,19 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rocketmq_go +package rocketmq -import "github.com/incubator-rocketmq-externals/rocketmq-go/service" +import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" type RocketMQProducer interface { } type MqProducerConfig struct { - } type DefaultMQProducer struct { producerGroup string ProducerConfig *MqProducerConfig producerService service.ProducerService -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/communication_mode.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/communication_mode.go b/rocketmq-go/remoting/communication_mode.go new file mode 100644 index 0000000..3380955 --- /dev/null +++ b/rocketmq-go/remoting/communication_mode.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remoting + +type CommunicationMode int + +const ( + Sync CommunicationMode = iota + Async + OneWay +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/custom_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/custom_header.go b/rocketmq-go/remoting/custom_header.go index ff1a6d4..40feade 100644 --- a/rocketmq-go/remoting/custom_header.go +++ b/rocketmq-go/remoting/custom_header.go @@ -15,6 +15,7 @@ * limitations under the License. */ package remoting + type CustomerHeader interface { FromMap(headerMap map[string]interface{}) } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/event_executor.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/event_executor.go b/rocketmq-go/remoting/event_executor.go new file mode 100644 index 0000000..38e1ee6 --- /dev/null +++ b/rocketmq-go/remoting/event_executor.go @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remoting + +import ( + "fmt" + "github.com/golang/glog" + "net" + "sync" +) + +type Runnable interface { + Run() +} + +type NetEventType int + +const ( + Connect NetEventType = iota + Close + Idle + Exception // TODO error? +) + +type NetEvent struct { + eType NetEventType + remoteAddress string + conn net.Conn +} + +func NewEventType(eType NetEventType, remoteAddr string, conn net.Conn) *NetEvent { + return &NetEvent{eType, remoteAddr, conn} +} + +func (event *NetEvent) Type() NetEventType { + return event.eType +} + +func (event *NetEvent) RemoteAddress() string { + return event.remoteAddress +} + +func (event *NetEvent) Conn() net.Conn { + return event.conn +} + +func (event *NetEvent) String() string { + return fmt.Sprintf("NettyEvent [type=%s, remoteAddr=%s, channel=%s]", + event.eType, event.remoteAddress, event.conn) +} + +type NetEventExecutor struct { + hasNotified bool + running bool + stopped chan int + mu sync.RWMutex // TODO need init? + client *RemotingClient + + eventQueue chan *NetEvent + maxSize int +} + +func NewNetEventExecutor(client *RemotingClient) *NetEventExecutor { + return &NetEventExecutor{ + hasNotified: false, + running: false, + stopped: make(chan int), + client: client, + eventQueue: make(chan *NetEvent, 100), // TODO confirm size + maxSize: 10000, + } +} + +func (executor *NetEventExecutor) Start() { + go executor.run() +} + +func (executor *NetEventExecutor) Shutdown() { + executor.stopped <- 0 +} + +func (executor *NetEventExecutor) PutEvent(event *NetEvent) { + if len(executor.eventQueue) <= executor.maxSize { + executor.eventQueue <- event //append(executor.eventQueue, event) + } else { + fmt.Sprintf("event queue size[%s] enough, so drop this event %s", len(executor.eventQueue), event.String()) + } +} + +func (executor *NetEventExecutor) ServiceName() string { + // TODO + return nil +} + +func (executor *NetEventExecutor) run() { + glog.Infof("%s service started", executor.ServiceName()) + + executor.mu.Lock() + executor.running = true + executor.mu.Unlock() + + listener := executor.client.ConnEventListener() + for executor.running { // TODO optimize + select { + case event := <-executor.eventQueue: + if event != nil && listener != nil { + switch event.Type() { + case Connect: + listener.OnConnConnect(event.remoteAddress, event.Conn()) + case Close: + listener.OnConnClose(event.remoteAddress, event.Conn()) + case Idle: + listener.OnConnIdle(event.remoteAddress, event.Conn()) + case Exception: + listener.OnConnException(event.remoteAddress, event.Conn()) + default: + break + } + } + case <-executor.stopped: + executor.mu.Lock() + executor.running = false + executor.mu.Unlock() + break + } + } + + glog.Infof("%s service exit.", executor.ServiceName()) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/invoke_callback.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/invoke_callback.go b/rocketmq-go/remoting/invoke_callback.go deleted file mode 100644 index 844c304..0000000 --- a/rocketmq-go/remoting/invoke_callback.go +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package remoting -type ResponseFuture struct { -} -type InvokeCallback func(responseFuture *ResponseFuture) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/remoting_client.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/remoting_client.go b/rocketmq-go/remoting/remoting_client.go index a4ce83a..38685cb 100644 --- a/rocketmq-go/remoting/remoting_client.go +++ b/rocketmq-go/remoting/remoting_client.go @@ -14,17 +14,305 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package remoting -type DefalutRemotingClient struct { +import ( + "errors" + "fmt" + "github.com/golang/glog" + "math/rand" + "net" + "sync" + "time" +) + +type ConnEventListener interface { + OnConnConnect(remoteAddress string, conn net.Conn) + OnConnClose(remoteAddress string, conn net.Conn) + OnConnIdle(remoteAddress string, conn net.Conn) + OnConnException(remoteAddress string, conn net.Conn) +} + +type NetRequestProcessor struct { +} + +type NetConfig struct { + clientWorkerNumber int + clientCallbackExecutorNumber int + clientOneWaySemaphoreValue int + clientAsyncSemaphoreValue int + connectTimeoutMillis time.Duration + channelNotActiveInterval time.Duration + + clientChannelMaxIdleTimeSeconds time.Duration + clientSocketSndBufSize int + clientSocketRcvBufSize int + clientPooledByteBufAllocatorEnable bool + clientCloseSocketIfTimeout bool +} + +type Pair struct { + o1 *NetRequestProcessor + o2 *ExecutorService +} + +type RemotingClient struct { + semaphoreOneWay sync.Mutex // TODO right? use chan? + semaphoreAsync sync.Mutex + processorTable map[int]*Pair + netEventExecutor *NetEventExecutor + defaultRequestProcessor *Pair + + config NetConfig + connTable map[string]net.Conn + connTableLock sync.RWMutex + timer *time.Timer + namesrvAddrList []string + namesrvAddrChoosed string + + callBackExecutor *ExecutorService + listener ConnEventListener + rpcHook RPCHook + responseTable map[int32]*ResponseFuture + responseTableLock sync.RWMutex +} + +type ConnHandlerContext struct { +} + +type ExecutorService struct { + callBackChannel chan func() + quit chan bool +} + +func (exec *ExecutorService) submit(callback func()) { + exec.callBackChannel <- callback +} + +func (exec *ExecutorService) run() { + go func() { + glog.Info("Callback Executor routing start.") + for { + select { + case invoke := <-exec.callBackChannel: + invoke() + case <-exec.quit: + break + } + } + glog.Info("Callback Executor routing quit.") + }() +} + +func NewRemotingClient(cfg NetConfig) *RemotingClient { + client := &RemotingClient{ + config: cfg, + connTable: make(map[string]net.Conn), + timer: time.NewTimer(10 * time.Second), + responseTable: make(map[int32]*ResponseFuture), + } + // java: super(xxxx) + return client +} + +func (rc *RemotingClient) PutNetEvent(event *NetEvent) { + rc.netEventExecutor.PutEvent(event) +} + +func (rc *RemotingClient) executeInvokeCallback(future *ResponseFuture) { + executor := rc.CallbackExecutor() + if executor != nil { + executor.submit(func() { + future.invokeCallback(future) + }) + return + } + future.executeInvokeCallback() +} + +// check timeout future +func (rc *RemotingClient) scanResponseTable() { + rfMap := make(map[int]*ResponseFuture) + for k, future := range rc.responseTable { // TODO safety? + if int64(future.beginTimestamp)+int64(future.timeoutMillis)+1e9 <= time.Now().Unix() { + future.Done() + delete(rc.responseTable, k) + rfMap[int(k)] = future + glog.Warningf("remove timeout request, ", future.String()) + } + } + + go func() { + for _, future := range rfMap { + rc.executeInvokeCallback(future) // TODO if still failed, how to deal with the message ? + } + }() +} + +func (rc *RemotingClient) CallbackExecutor() *ExecutorService { + return rc.callBackExecutor +} + +func (rc *RemotingClient) RPCHook() RPCHook { + return rc.rpcHook +} + +func (rc *RemotingClient) ConnEventListener() ConnEventListener { + return rc.listener +} + +func (rc *RemotingClient) invokeSync(addr string, request *RemotingCommand, + timeout time.Duration) (*RemotingCommand, error) { + conn := rc.getAndCreateConn(addr) + if conn != nil { + if rc.rpcHook != nil { + rc.rpcHook.DoBeforeRequest(addr, request) + } + opaque := request.opaque + //defer delete(rc.responseTable, opaque) TODO should in listener + + future := &ResponseFuture{ + opaque: opaque, + timeoutMillis: timeout, + } + rc.responseTable[opaque] = future + + conn.Write(request.encode()) // TODO register listener + + response := future.WaitResponse(timeout) + if response == nil { + if future.sendRequestOK { + return nil, errors.New(fmt.Sprintf("RemotingTimeout error: %s", future.err.Error())) + } else { + return nil, errors.New(fmt.Sprintf("RemotingSend error: %s", future.err.Error())) + } + } + + if rc.rpcHook != nil { + rc.rpcHook.DoBeforeResponse(addr, response) + } + return response, nil + } else { + rc.CloseConn(addr) // TODO + return nil, errors.New(fmt.Sprintf("Connection to %s ERROR!", addr)) + } } -func (self *DefalutRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error) { - return +func (rc *RemotingClient) invokeAsync(addr string, request *RemotingCommand, + timeout time.Duration, callback InvokeCallback) error { + conn := rc.getAndCreateConn(addr) + if conn != nil { // TODO how to confirm conn active? + if rc.rpcHook != nil { + rc.rpcHook.DoBeforeRequest(addr, request) + } + + opaque := request.opaque + acquired := false // TODO semaphore.tryAcquire... + if acquired { + //final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); + + future := NewResponseFuture(opaque, timeout, callback) + rc.responseTable[opaque] = future + + future.WaitResponse(timeout) // TODO add listener + } + return nil + } else { + rc.CloseConn(addr) // TODO + return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr)) + } } -func (self *DefalutRemotingClient)InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error { - return + +func (rc *RemotingClient) invokeOneWay(addr string, request *RemotingCommand, + timeout time.Duration) error { + conn := rc.getAndCreateConn(addr) + if conn != nil { + if rc.rpcHook != nil { + rc.rpcHook.DoBeforeRequest(addr, request) + } + + request.MarkOneWayRpc() + // TODO boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + return nil + } else { + rc.CloseConn(addr) // TODO + return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr)) + } } -func (self *DefalutRemotingClient)InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error { - return + +func initValueIndex() int { + r := rand.Int() + if r < 0 { // math.Abs para is float64 + r = -r + } + return r % 999 % 999 +} + +func (rc *RemotingClient) Start() { + // TODO +} + +func (rc *RemotingClient) Shutdown() { + // TODO + rc.timer.Stop() +} + +func (rc *RemotingClient) registerRPCHook(hk RPCHook) { + rc.rpcHook = hk +} + +func (rc *RemotingClient) CloseConn(addr string) { + // TODO +} + +func (rc *RemotingClient) updateNameServerAddressList(addrs []string) { + old, update := rc.namesrvAddrList, false + + if addrs != nil && len(addrs) > 0 { + if old == nil || len(addrs) != len(old) { + update = true + } else { + for i := 0; i < len(addrs) && !update; i++ { + if contains(old, addrs[i]) { + update = true + } + } + } + } + + if update { + rc.namesrvAddrList = addrs // TODO safe? + } + +} + +func (rc *RemotingClient) getAndCreateConn(addr string) net.Conn { + return nil +} + +func (rc *RemotingClient) getAndCreateNamesrvConn() net.Conn { + return nil +} + +func (rc *RemotingClient) createConn(addr string) net.Conn { + + return nil +} + +func (rc *RemotingClient) RegisterProcessor(requestCode int, processor *NetRequestProcessor, executor ExecutorService) { + // TODO +} + +func (rc *RemotingClient) String() string { + return nil // TODO +} + +func contains(s []string, o string) bool { // TODO optimize + for _, v := range s { + if o == v { + return true + } + } + return false } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/remoting_command.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/remoting_command.go b/rocketmq-go/remoting/remoting_command.go index 25d9645..b57f1e9 100644 --- a/rocketmq-go/remoting/remoting_command.go +++ b/rocketmq-go/remoting/remoting_command.go @@ -14,9 +14,168 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package remoting +// TODO: refactor +import ( + "bytes" + "encoding/binary" + "encoding/json" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header" + "log" + "os" + "strconv" + "sync" + "sync/atomic" +) + +func init() { + // TODO +} + +const ( + SerializeTypeProperty = "rocketmq.serialize.type" + SerializeTypeEnv = "ROCKETMQ_SERIALIZE_TYPE" + RemotingVersionKey = "rocketmq.remoting.version" + rpcType = 0 // 0, request command + rpcOneWay = 1 // 0, RPC +) + +type RemotingCommandType int + +const ( + ResponseCommand RemotingCommandType = iota + RqeusetCommand +) + +var configVersion int = -1 +var requestId int32 +var decodeLock sync.Mutex + type RemotingCommand struct { + //header + code int `json:"code"` + language string `json:"language"` + version int `json:"version"` + opaque int32 `json:"opaque"` + flag int `json:"flag"` + remark string `json:"remark"` + extFields map[string]string `json:"extFields"` + header CustomerHeader // transient + //body + body []byte `json:"body,omitempty"` +} + +func NewRemotingCommand(code int, header CustomerHeader) *RemotingCommand { + cmd := &RemotingCommand{ + code: code, + header: header, + } + setCmdVersion(cmd) + return cmd +} + +func setCmdVersion(cmd *RemotingCommand) { + if configVersion >= 0 { + cmd.version = configVersion // safety + } else if v := os.Getenv(RemotingVersionKey); v != "" { + value, err := strconv.Atoi(v) + if err != nil { + // TODO log + } + cmd.version = value + configVersion = value + } +} + +func (cmd *RemotingCommand) encodeHeader() []byte { + length := 4 + headerData := cmd.buildHeader() + length += len(headerData) + + if cmd.body != nil { + length += len(cmd.body) + } + + buf := bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.BigEndian, length) + binary.Write(buf, binary.BigEndian, len(cmd.body)) + buf.Write(headerData) + + return buf.Bytes() +} + +func (cmd *RemotingCommand) buildHeader() []byte { + buf, err := json.Marshal(cmd) + if err != nil { + return nil + } + return buf } +func (cmd *RemotingCommand) encode() []byte { + length := 4 + headerData := cmd.buildHeader() + length += len(headerData) + + if cmd.body != nil { + length += len(cmd.body) + } + + buf := bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.LittleEndian, length) + binary.Write(buf, binary.LittleEndian, len(cmd.body)) + buf.Write(headerData) + + if cmd.body != nil { + buf.Write(cmd.body) + } + + return buf.Bytes() +} + +func decodeRemoteCommand(header, body []byte) *RemotingCommand { + decodeLock.Lock() + defer decodeLock.Unlock() + + cmd := &RemotingCommand{} + cmd.extFields = make(map[string]string) + err := json.Unmarshal(header, cmd) + if err != nil { + log.Print(err) + return nil + } + cmd.body = body + return cmd +} + +func CreateRemotingCommand(code int, requestHeader *header.SendMessageRequestHeader) *RemotingCommand { + cmd := &RemotingCommand{} + cmd.code = code + cmd.header = requestHeader + cmd.version = 1 + cmd.opaque = atomic.AddInt32(&requestId, 1) // TODO: safety? + return cmd +} + +func (cmd *RemotingCommand) SetBody(body []byte) { + cmd.body = body +} + +func (cmd *RemotingCommand) Type() RemotingCommandType { + bits := 1 << rpcType + if (cmd.flag & bits) == bits { + return ResponseCommand + } + return RqeusetCommand +} + +func (cmd *RemotingCommand) MarkOneWayRpc() { + cmd.flag |= (1 << rpcOneWay) +} + +func (cmd *RemotingCommand) String() string { + return nil // TODO +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/response_future.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/response_future.go b/rocketmq-go/remoting/response_future.go new file mode 100644 index 0000000..a8c2aec --- /dev/null +++ b/rocketmq-go/remoting/response_future.go @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +import ( + "sync" + "time" +) + +type InvokeCallback func(responseFuture *ResponseFuture) + +type ResponseFuture struct { + opaque int32 + timeoutMillis time.Duration + invokeCallback InvokeCallback + beginTimestamp int64 + responseCommand *RemotingCommand + sendRequestOK bool + done chan bool + latch sync.WaitGroup + err error +} + +func NewResponseFuture(opaque int32, timeout time.Duration, callback InvokeCallback) *ResponseFuture { + future := &ResponseFuture{ + opaque: opaque, + timeoutMillis: timeout, + invokeCallback: callback, + latch: sync.WaitGroup{}, + } + future.latch.Add(1) + return future +} +func (future *ResponseFuture) SetResponseFuture(cmd *RemotingCommand) { + future.responseCommand = cmd +} + +func (future *ResponseFuture) Done() { + future.latch.Done() + future.done <- true +} + +func (future *ResponseFuture) executeInvokeCallback() { + future.invokeCallback(nil) // TODO +} + +func (future *ResponseFuture) WaitResponse(timeout time.Duration) *RemotingCommand { + go func() { // TODO optimize + time.Sleep(timeout) + future.latch.Add(-1) // TODO whats happened when counter less than 0 + }() + future.latch.Wait() + return future.responseCommand +} + +func (future *ResponseFuture) String() string { + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/remoting/rpchook.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/remoting/rpchook.go b/rocketmq-go/remoting/rpchook.go new file mode 100644 index 0000000..150ffe0 --- /dev/null +++ b/rocketmq-go/remoting/rpchook.go @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remoting + +type RPCHook interface { + DoBeforeRequest(string, *RemotingCommand) + DoBeforeResponse(string, *RemotingCommand) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/client_api.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/client_api.go b/rocketmq-go/service/client_api.go new file mode 100644 index 0000000..ff750ba --- /dev/null +++ b/rocketmq-go/service/client_api.go @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package service + +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting" +) + +var sendSmartMsg bool = true // TODO get from system env + +type TopAddress struct { +} + +type ClientRemotingProcessor interface { +} + +func init() { + // TODO +} + +type MQClientAPI struct { + remotingClient *remoting.RemotingClient + topAddress *TopAddress + crp *ClientRemotingProcessor + nameServerAddress string + config *config.ClientConfig +} + +func NewMQClientAPI(cfg *config.ClientConfig, processor *ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI { + api := &MQClientAPI{ + remotingClient: &remoting.RemotingClient{}, //TODO + topAddress: &TopAddress{}, // TODO + crp: processor, + config: cfg, + } + + // TODO register + return api +} + +func (api *MQClientAPI) SendMessage(addr, brokerName string, + msg message.Message, requestHeader header.SendMessageRequestHeader, timeout int64) *model.SendResult { + var request *remoting.RemotingCommand + request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader) + request.SetBody(msg.Body) + return api.sendMessageSync(addr, brokerName, msg, timeout, request) +} + +func (api *MQClientAPI) sendMessageSync(addr, brokerName string, + msg message.Message, + timeout int64, + request *remoting.RemotingCommand) *model.SendResult { + response := api.invokeSync(addr, request, timeout) + if response == nil { + panic("invokeSync panci!") + } + return nil + // TODO return api.processSendResponse(brokerName, msg, response) +} + +func (api *MQClientAPI) invokeSync(addr string, cmd *remoting.RemotingCommand, timeout int64) *remoting.RemotingCommand { + return nil +} + +func (api *MQClientAPI) processSendResponse(name string, msg message.Message, cmd *remoting.RemotingCommand) *remoting.RemotingCommand { + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/client_error_code.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/client_error_code.go b/rocketmq-go/service/client_error_code.go new file mode 100644 index 0000000..d031638 --- /dev/null +++ b/rocketmq-go/service/client_error_code.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package service + +const ( + ConnecBrokerError = 10001 + AccessBrokerTimeoutError = 10002 + BrokerNotExistError = 10003 + NoNameServerError = 10004 + NotFoundTopicError = 10005 +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/consume_messsage_service.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/consume_messsage_service.go b/rocketmq-go/service/consume_messsage_service.go index 77a5d14..d3b28fc 100644 --- a/rocketmq-go/service/consume_messsage_service.go +++ b/rocketmq-go/service/consume_messsage_service.go @@ -17,5 +17,4 @@ package service type ConsumeMessageService struct { - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/mq_client.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/mq_client.go b/rocketmq-go/service/mq_client.go index f62766e..0cf7df2 100644 --- a/rocketmq-go/service/mq_client.go +++ b/rocketmq-go/service/mq_client.go @@ -17,5 +17,4 @@ package service type RocketMqClient interface { - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/producer_service.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/producer_service.go b/rocketmq-go/service/producer_service.go index e3657a9..a684b27 100644 --- a/rocketmq-go/service/producer_service.go +++ b/rocketmq-go/service/producer_service.go @@ -16,11 +16,13 @@ */ package service -import "github.com/incubator-rocketmq-externals/rocketmq-go/model/config" +import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" + type ProducerService interface { } + type DefaultProducerService struct { - producerGroup string - producerConfig *config.RocketMqProducerConfig - mqClient RocketMqClient -} \ No newline at end of file + producerGroup string + producerConfig *config.RocketMqProducerConfig + mqClient RocketMqClient +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/42fc2235/rocketmq-go/service/rebalance_service.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/rebalance_service.go b/rocketmq-go/service/rebalance_service.go index bb97c87..acdcdd6 100644 --- a/rocketmq-go/service/rebalance_service.go +++ b/rocketmq-go/service/rebalance_service.go @@ -16,7 +16,7 @@ */ package service -import "github.com/incubator-rocketmq-externals/rocketmq-go/model/config" +import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" type Rebalance struct { mqClient RocketMqClient
