Go-Client remoting and RocketMqClient common method implement, closes apache/incubator-rocketmq-externals#17
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/aaa0758e Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/aaa0758e Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/aaa0758e Branch: refs/heads/master Commit: aaa0758e6cdf2d1020a900c236ab84ce4071f2ff Parents: 7ceba1b Author: StyleTang <[email protected]> Authored: Sun May 7 11:58:44 2017 +0800 Committer: yukon <[email protected]> Committed: Sun May 7 11:58:44 2017 +0800 ---------------------------------------------------------------------- rocketmq-go/docs/checklist.md | 1 + rocketmq-go/docs/package.puml | 20 +- rocketmq-go/docs/roadmap.md | 48 +- rocketmq-go/example/consumer_example.go | 35 ++ rocketmq-go/example/rocketmq_client_example.go | 40 ++ rocketmq-go/model/config/client_config.go | 3 +- rocketmq-go/model/config/consumer_config.go | 79 +++ rocketmq-go/model/constant/config.go | 29 + rocketmq-go/model/constant/message_const.go | 45 ++ rocketmq-go/model/constant/message_sys_flag.go | 26 + rocketmq-go/model/constant/mix_all.go | 62 ++ rocketmq-go/model/constant/perm.go | 33 + rocketmq-go/model/constant/pull_sys_flag.go | 24 + .../model/consume_concurrently_result.go | 27 + .../model/consume_message_directly_result.go | 31 + rocketmq-go/model/consumer_running_info.go | 81 +++ ...me_message_directly_result_request_header.go | 32 + .../consumer_send_msg_back_request_header.go | 31 + .../model/header/get_consumer_list_by_group.go | 33 + .../get_consumer_running_info_request_header.go | 29 + .../header/get_max_offset_request_header.go | 26 + .../header/get_max_offset_response_header.go | 28 + .../header/get_route_info_request_header.go | 25 + .../query_consumer_offset_request_header.go | 27 + .../model/header/reset_offset_request_header.go | 37 ++ .../header/search_offset_request_header.go | 33 + .../update_consumer_offset_request_header.go | 34 + rocketmq-go/model/heart_beat.go | 34 + rocketmq-go/model/message.go | 134 ++++ rocketmq-go/model/message/message.go | 2 +- rocketmq-go/model/message_ext.go | 71 +++ rocketmq-go/model/message_listener.go | 19 + rocketmq-go/model/message_queue.go | 77 +++ rocketmq-go/model/process_queue.go | 614 +++++++++++++++++++ rocketmq-go/model/process_queue_info.go | 45 ++ rocketmq-go/model/pull_request.go | 24 + rocketmq-go/model/reset_offset_body.go | 55 ++ rocketmq-go/model/response_code.go | 16 + rocketmq-go/model/send_result.go | 2 +- rocketmq-go/model/subscription_data.go | 26 + rocketmq-go/model/topic_publishInfo.go | 108 ++-- rocketmq-go/model/topic_publish_info.go | 96 +++ rocketmq-go/model/topic_route_data.go | 216 ++++--- rocketmq-go/mq_client_manager.go | 56 +- rocketmq-go/mq_consumer.go | 46 +- rocketmq-go/mq_producer.go | 6 +- rocketmq-go/remoting/custom_header.go | 1 + rocketmq-go/remoting/event_executor.go | 144 ----- rocketmq-go/remoting/json_serializable.go | 42 ++ rocketmq-go/remoting/remoting_client.go | 534 ++++++++-------- rocketmq-go/remoting/remoting_command.go | 178 +----- rocketmq-go/remoting/request_code.go | 111 ++++ rocketmq-go/remoting/request_processor.go | 26 + rocketmq-go/remoting/response_code.go | 53 ++ rocketmq-go/remoting/response_future.go | 61 +- rocketmq-go/remoting/rocketmq_serializable.go | 139 +++++ rocketmq-go/remoting/serializable.go | 80 +++ rocketmq-go/service/client_api.go | 39 +- rocketmq-go/service/consume_message_service.go | 153 +++++ rocketmq-go/service/consume_messsage_service.go | 20 - rocketmq-go/service/mq_client.go | 343 +++++++++++ rocketmq-go/util/concurrent_map.go | 278 +++++++++ rocketmq-go/util/ip.go | 73 +++ rocketmq-go/util/json_util.go | 157 +++++ rocketmq-go/util/message_client_id_generator.go | 110 ++++ rocketmq-go/util/string_util.go | 91 +++ rocketmq-go/util/structs/field.go | 141 +++++ rocketmq-go/util/structs/structs.go | 581 ++++++++++++++++++ rocketmq-go/util/structs/tags.go | 32 + 69 files changed, 5135 insertions(+), 818 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/checklist.md ---------------------------------------------------------------------- diff --git a/rocketmq-go/docs/checklist.md b/rocketmq-go/docs/checklist.md new file mode 100644 index 0000000..0412f86 --- /dev/null +++ b/rocketmq-go/docs/checklist.md @@ -0,0 +1 @@ +# Test Check List http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/package.puml ---------------------------------------------------------------------- diff --git a/rocketmq-go/docs/package.puml b/rocketmq-go/docs/package.puml index 9c78711..f34dd38 100644 --- a/rocketmq-go/docs/package.puml +++ b/rocketmq-go/docs/package.puml @@ -37,6 +37,18 @@ serviceState clientIP instanceName } + class remoting.ClientRequestProcessor{ + receive request and invoke the method. + GET_CONSUMER_STATUS_FROM_CLIENT + GET_CONSUMER_RUNNING_INFO + ... + } + class remoting.SerializerHandler{ + JsonSerializer + RocketMqSerializer + } + + namespace service{ @@ -61,8 +73,10 @@ namespace service{ } namespace remoting { - RemotingClient *-- RemotingCommand:contains - RemotingClient *-- ClientConfig:contains + RemotingClient *-- RemotingCommand + RemotingClient *-- ClientConfig + RemotingClient *-- ClientRequestProcessor + RemotingClient *-- SerializerHandler } @@ -75,5 +89,5 @@ namespace rocketmq_go{ note top of remoting.RemotingClient :ï¼sync|aysc|oneWayï¼ note top of remoting :netï¼serializeï¼connectï¼request response -note top of service.MqClient :mq method +note top of service.MqClient :mq common method @enduml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/roadmap.md ---------------------------------------------------------------------- diff --git a/rocketmq-go/docs/roadmap.md b/rocketmq-go/docs/roadmap.md index e6eabff..0db9033 100644 --- a/rocketmq-go/docs/roadmap.md +++ b/rocketmq-go/docs/roadmap.md @@ -1,19 +1,5 @@ # RoadMap-Milestone1 -## Producer -- [ ] ProducerType - - [ ] DefaultProducer -- [ ] API - - [ ] Send - - [ ] Sync -- [ ] Other - - [ ] DelayMessage - - [ ] Config - - [ ] MessageId Generate - - [ ] CompressMsg - - [ ] TimeOut - - [ ] LoadBalance - - [ ] DefaultTopic ## Consumer - [ ] ConsumerType - [ ] PushConsumer @@ -47,23 +33,23 @@ - [ ] UpdateTopicRouteInfoFromNameServer - [ ] PersistAllConsumerOffset - [ ] ClearExpiredMessage(form consumer consumeMessageService) - - -## Remoting -- [ ] MqClientRequest - - [ ] InvokeSync - - [ ] InvokeAsync - - [ ] InvokeOneWay - [ ] ClientRemotingProcessor + - [ ] CHECK_TRANSACTION_STATE - [ ] NOTIFY_CONSUMER_IDS_CHANGED - [ ] RESET_CONSUMER_CLIENT_OFFSET - [ ] GET_CONSUMER_STATUS_FROM_CLIENT - [ ] GET_CONSUMER_RUNNING_INFO - [ ] CONSUME_MESSAGE_DIRECTLY -- [ ] Serialize - - [ ] JSON - - [ ] ROCKETMQ -- [ ] NamesrvAddrChoosed(HA) + +## Remoting +- [x] MqClientRequest + - [x] InvokeSync + - [x] InvokeAsync + - [x] InvokeOneWay +- [x] Serialize + - [x] JSON + - [x] ROCKETMQ +- [x] NamesrvAddrChoosed(HA) # RoadMap-ALL @@ -143,13 +129,6 @@ - [ ] PersistAllConsumerOffset - [ ] ClearExpiredMessage(form consumer consumeMessageService) - [ ] UploadFilterClassSource(FromHeartBeat/But Golang Not Easy To do this(Java Source)) - - -## Remoting -- [ ] MqClientRequest - - [ ] InvokeSync - - [ ] InvokeAsync - - [ ] InvokeOneWay - [ ] ClientRemotingProcessor - [ ] CHECK_TRANSACTION_STATE - [ ] NOTIFY_CONSUMER_IDS_CHANGED @@ -157,6 +136,11 @@ - [ ] GET_CONSUMER_STATUS_FROM_CLIENT - [ ] GET_CONSUMER_RUNNING_INFO - [ ] CONSUME_MESSAGE_DIRECTLY +## Remoting +- [ ] MqClientRequest + - [ ] InvokeSync + - [ ] InvokeAsync + - [ ] InvokeOneWay - [ ] Serialize - [ ] JSON - [ ] ROCKETMQ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/example/consumer_example.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/example/consumer_example.go b/rocketmq-go/example/consumer_example.go index 5c0044f..af74c01 100644 --- a/rocketmq-go/example/consumer_example.go +++ b/rocketmq-go/example/consumer_example.go @@ -16,5 +16,40 @@ */ package main +import ( + "errors" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model" + "github.com/golang/glog" +) + func main() { + + // create a mqClientManager instance + var mqClientConfig = &rocketmq.MqClientConfig{} + var mqClientManager = rocketmq.NewMqClientManager(mqClientConfig) + + // create rocketMq consumer + var consumerConfig = &rocketmq.MqConsumerConfig{} + var consumer1 = rocketmq.NewDefaultMQPushConsumer("testGroup", consumerConfig) + consumer1.Subscribe("testTopic", "*") + consumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult { + var index = -1 + for i, msg := range msgs { + // your code here,for example,print msg + glog.Info(msg) + var err = errors.New("error") + if err != nil { + break + } + index = i + } + return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: index} + }) + + //register consumer to mqClientManager + mqClientManager.RegisterConsumer(consumer1) + + //start it + mqClientManager.Start() } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/example/rocketmq_client_example.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/example/rocketmq_client_example.go b/rocketmq-go/example/rocketmq_client_example.go new file mode 100644 index 0000000..c6828c8 --- /dev/null +++ b/rocketmq-go/example/rocketmq_client_example.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package main + +import ( + "fmt" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" +) + +func main() { + + var clienConfig = config.NewClientConfig() + clienConfig.SetNameServerAddress("120.55.113.35:9876") + + //use json serializer + var mqClient = service.MqClientInit(clienConfig, nil) + fmt.Println(mqClient.TryToFindTopicPublishInfo("GoLang")) + //&{false true [{GoLang broker-a 0} {GoLang broker-a 1} {GoLang broker-a 2} {GoLang broker-a 3}] 0xc420016800 0} <nil> + + //use rocketmq serializer + constant.USE_HEADER_SERIALIZETYPE = constant.ROCKETMQ_SERIALIZE + var mqClient2 = service.MqClientInit(clienConfig, nil) + fmt.Println(mqClient2.TryToFindTopicPublishInfo("GoLang")) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/config/client_config.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/config/client_config.go b/rocketmq-go/model/config/client_config.go index d9ad88c..8a415d8 100644 --- a/rocketmq-go/model/config/client_config.go +++ b/rocketmq-go/model/config/client_config.go @@ -24,7 +24,8 @@ import ( // client common config type ClientConfig struct { - nameServerAddress string + nameServerAddress string // only this is in use + clientIP string instanceName string clientCallbackExecutorThreads int // TODO: clientCallbackExecutorThreads http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/config/consumer_config.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/config/consumer_config.go b/rocketmq-go/model/config/consumer_config.go index a37eaa0..25f7585 100644 --- a/rocketmq-go/model/config/consumer_config.go +++ b/rocketmq-go/model/config/consumer_config.go @@ -16,5 +16,84 @@ */ package config +import "time" + type RocketMqConsumerConfig struct { + ConsumeFromWhere string + /** + * Minimum consumer thread number + */ + //consumeThreadMin int + // /** + // * Max consumer thread number + // */ + //consumeThreadMax int + + /** + * Threshold for dynamic adjustment of the number of thread pool + */ + //adjustThreadPoolNumsThreshold int // = 100000; + + /** + * Concurrently max span offset.it has no effect on sequential consumption + */ + ConsumeConcurrentlyMaxSpan int // = 2000; + /** + * Flow control threshold + */ + PullThresholdForQueue int //= 1000; + /** + * Message pull Interval + */ + PullInterval int64 //= 0; + /** + * Batch consumption size + */ + ConsumeMessageBatchMaxSize int //= 1; + /** + * Batch pull size + */ + PullBatchSize int //= 32; + + /** + * Whether update subscription relationship when every pull + */ + PostSubscriptionWhenPull bool //= false; //get subExpression + + /** + * Whether the unit of subscription group + */ + UnitMode bool // = false; + MaxReconsumeTimes int //= 16; + SuspendCurrentQueueTimeMillis int64 //= 1000; + ConsumeTimeout int64 //= 15 //minutes + + //=========can not change + /** + * Delay some time when exception occur + */ + PullTimeDelayMillsWhenException int64 //= 3000; + /** + * Flow control interval + */ + PullTimeDelayMillsWhenFlowControl int64 //= 50; + /** + * Delay some time when suspend pull service + */ + PullTimeDelayMillsWhenSuspend int64 //= 1000; + BrokerSuspendMaxTimeMillis int64 //1000 * 15; + ConsumerTimeoutMillisWhenSuspend int64 //= 1000 * 30; + + /** + * Backtracking consumption time with second precision.time format is + * 20131223171201 + * Implying Seventeen twelve and 01 seconds on December 23, 2013 year + * Default backtracking consumption time Half an hour ago + */ + ConsumeTimestamp time.Time //when use CONSUME_FROM_TIMESTAMP +} + +func NewRocketMqConsumerConfig() (consumerConfig *RocketMqConsumerConfig) { + consumerConfig = &RocketMqConsumerConfig{} + return } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/config.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/constant/config.go b/rocketmq-go/model/constant/config.go new file mode 100644 index 0000000..c48dfa5 --- /dev/null +++ b/rocketmq-go/model/constant/config.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package constant + +//-------SerializeType------- +var JSON_SERIALIZE byte = 0 +var ROCKETMQ_SERIALIZE byte = 1 + +//-------SerializeType------- + +var USE_HEADER_SERIALIZETYPE = JSON_SERIALIZE + +var REMOTING_COMMAND_FLAG = 0 +var REMOTING_COMMAND_LANGUAGE = "OTHER" +var REMOTING_COMMAND_VERSION int16 = 137 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/message_const.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/constant/message_const.go b/rocketmq-go/model/constant/message_const.go new file mode 100644 index 0000000..402d328 --- /dev/null +++ b/rocketmq-go/model/constant/message_const.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package constant + +const ( + PROPERTY_KEYS = "KEYS" + PROPERTY_TAGS = "TAGS" + PROPERTY_WAIT_STORE_MSG_OK = "WAIT" + PROPERTY_DELAY_TIME_LEVEL = "DELAY" + PROPERTY_RETRY_TOPIC = "RETRY_TOPIC" + PROPERTY_REAL_TOPIC = "REAL_TOPIC" + PROPERTY_REAL_QUEUE_ID = "REAL_QID" + PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG" + PROPERTY_PRODUCER_GROUP = "PGROUP" + PROPERTY_MIN_OFFSET = "MIN_OFFSET" + PROPERTY_MAX_OFFSET = "MAX_OFFSET" + PROPERTY_BUYER_ID = "BUYER_ID" + PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID" + PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG" + PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG" + PROPERTY_MQ2_FLAG = "MQ2_FLAG" + PROPERTY_RECONSUME_TIME = "RECONSUME_TIME" + PROPERTY_MSG_REGION = "MSG_REGION" + PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY" + PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES" + PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME" + + COMMUNICATIONMODE_SYNC = "SYNC" + COMMUNICATIONMODE_ASYNC = "ASYNC" + COMMUNICATIONMODE_ONEWAY = "ONEWAY" +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/message_sys_flag.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/constant/message_sys_flag.go b/rocketmq-go/model/constant/message_sys_flag.go new file mode 100644 index 0000000..a53c4fd --- /dev/null +++ b/rocketmq-go/model/constant/message_sys_flag.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package constant + +const ( + CompressedFlag int32 = (0x1 << 0) + MultiTagsFlag int32 = (0x1 << 1) + TransactionNotType int32 = (0x0 << 2) + TransactionPreparedType int32 = (0x1 << 2) + TransactionCommitType int32 = (0x2 << 2) + TransactionRollbackType int32 = (0x3 << 2) +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/mix_all.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/constant/mix_all.go b/rocketmq-go/model/constant/mix_all.go new file mode 100644 index 0000000..6abaabe --- /dev/null +++ b/rocketmq-go/model/constant/mix_all.go @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package constant + +const ( + ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME" + ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir" + NAMESRV_ADDR_ENV = "NAMESRV_ADDR" + NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr" + MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel" + //WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", "jmenv.tbsite.net") + //WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr") + // http://jmenv.tbsite.net:8080/rocketmq/nsaddr + //WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP + DEFAULT_TOPIC = "TBW102" + BENCHMARK_TOPIC = "BenchmarkTest" + DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER" + DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER" + TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER" + FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER" + MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER" + CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" + SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP" + SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP" + SELF_TEST_TOPIC = "SELF_TEST_TOPIC" + OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT" + ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY" + CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION" + CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER" + CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL" + CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_" + + //public static final List<String> LocalInetAddrs = getLocalInetAddress() + //Localhost = localhost() + //DEFAULT_CHARSET = "UTF-8" + MASTER_ID int64 = 0 + CURRENT_JVM_PID + + RETRY_GROUP_TOPIC_PREFIX = "%RETRY%" + + DLQ_GROUP_TOPIC_PREFIX = "%DLQ%" + SYSTEM_TOPIC_PREFIX = "rmq_sys_" + UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY" + MAX_MESSAGE_BODY_SIZE int = 4 * 1024 * 1024 //4m + MAX_MESSAGE_TOPIC_SIZE int = 255 //255char + + DEFAULT_TOPIC_QUEUE_NUMS int32 = 4 +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/perm.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/constant/perm.go b/rocketmq-go/model/constant/perm.go new file mode 100644 index 0000000..962d989 --- /dev/null +++ b/rocketmq-go/model/constant/perm.go @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package constant + +const ( + PERM_PRIORITY = 0x1 << 3 + PERM_READ = 0x1 << 2 + PERM_WRITE = 0x1 << 1 + PERM_INHERIT = 0x1 << 0 +) + +func WriteAble(perm int32) (ret bool) { + ret = ((perm & PERM_WRITE) == PERM_WRITE) + return +} +func ReadAble(perm int32) (ret bool) { + ret = ((perm & PERM_READ) == PERM_READ) + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/pull_sys_flag.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/constant/pull_sys_flag.go b/rocketmq-go/model/constant/pull_sys_flag.go new file mode 100644 index 0000000..0a2921c --- /dev/null +++ b/rocketmq-go/model/constant/pull_sys_flag.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package constant + +const ( + FLAG_COMMIT_OFFSET int32 = 0x1 << 0 + FLAG_SUSPEND int32 = 0x1 << 1 + FLAG_SUBSCRIPTION int32 = 0x1 << 2 + FLAG_CLASS_FILTER int32 = 0x1 << 3 +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consume_concurrently_result.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/consume_concurrently_result.go b/rocketmq-go/model/consume_concurrently_result.go new file mode 100644 index 0000000..6e4df7b --- /dev/null +++ b/rocketmq-go/model/consume_concurrently_result.go @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +const ( + CONSUME_SUCCESS = "CONSUME_SUCCESS" + RECONSUME_LATER = "RECONSUME_LATER" +) + +type ConsumeConcurrentlyResult struct { + ConsumeConcurrentlyStatus string + AckIndex int +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consume_message_directly_result.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/consume_message_directly_result.go b/rocketmq-go/model/consume_message_directly_result.go new file mode 100644 index 0000000..a9af32e --- /dev/null +++ b/rocketmq-go/model/consume_message_directly_result.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +type ConsumeMessageDirectlyResult struct { + Order bool `json:"order"` + AutoCommit bool `json:"autoCommit"` + //CR_SUCCESS, + //CR_LATER, + //CR_ROLLBACK, + //CR_COMMIT, + //CR_THROW_EXCEPTION, + //CR_RETURN_NULL, + ConsumeResult string `json:"consumeResult"` + Remark string `json:"remark"` + SpentTimeMills int64 `json:"spentTimeMills"` +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consumer_running_info.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/consumer_running_info.go b/rocketmq-go/model/consumer_running_info.go new file mode 100644 index 0000000..80c39ae --- /dev/null +++ b/rocketmq-go/model/consumer_running_info.go @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +import "encoding/json" + +type ConsumerRunningInfo struct { + Properties map[string]string `json:"properties"` + MqTable map[MessageQueue]ProcessQueueInfo `json:"mqTable"` + // todo + //private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>(); + // + //private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>(); + // + //private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>(); + // + //private String jstack; +} + +func (self *ConsumerRunningInfo) Encode() (jsonByte []byte, err error) { + mqTableJsonStr := "{" + first := true + var keyJson []byte + var valueJson []byte + + for key, value := range self.MqTable { + keyJson, err = json.Marshal(key) + if err != nil { + return + } + valueJson, err = json.Marshal(value) + if err != nil { + return + } + if first == false { + mqTableJsonStr = mqTableJsonStr + "," + } + mqTableJsonStr = mqTableJsonStr + string(keyJson) + ":" + string(valueJson) + first = false + } + mqTableJsonStr = mqTableJsonStr + "}" + var propertiesJson []byte + propertiesJson, err = json.Marshal(self.Properties) + if err != nil { + return + } + jsonByte = self.formatEncode("\"properties\"", string(propertiesJson), "\"mqTable\"", string(mqTableJsonStr)) + return +} +func (self *ConsumerRunningInfo) formatEncode(kVList ...string) []byte { + jsonStr := "{" + first := true + for i := 0; i+1 < len(kVList); i += 2 { + if first == false { + jsonStr = jsonStr + "," + } + keyJson := kVList[i] + valueJson := kVList[i+1] + + jsonStr = jsonStr + string(keyJson) + ":" + string(valueJson) + + first = false + } + jsonStr = jsonStr + "}" + return []byte(jsonStr) + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/consume_message_directly_result_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/consume_message_directly_result_request_header.go b/rocketmq-go/model/header/consume_message_directly_result_request_header.go new file mode 100644 index 0000000..a593d51 --- /dev/null +++ b/rocketmq-go/model/header/consume_message_directly_result_request_header.go @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +type ConsumeMessageDirectlyResultRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` + ClientId string `json:"clientId"` + MsgId string `json:"msgId"` + BrokerName string `json:"brokerName"` +} + +func (self *ConsumeMessageDirectlyResultRequestHeader) FromMap(headerMap map[string]interface{}) { + self.ConsumerGroup = headerMap["consumerGroup"].(string) + self.ClientId = headerMap["clientId"].(string) + self.MsgId = headerMap["msgId"].(string) + self.BrokerName = headerMap["brokerName"].(string) + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/consumer_send_msg_back_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/consumer_send_msg_back_request_header.go b/rocketmq-go/model/header/consumer_send_msg_back_request_header.go new file mode 100644 index 0000000..4e101c6 --- /dev/null +++ b/rocketmq-go/model/header/consumer_send_msg_back_request_header.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +type ConsumerSendMsgBackRequestHeader struct { + Offset int64 + Group string + DelayLevel int32 + OriginMsgId string + OriginTopic string + UnitMode bool + MaxReconsumeTimes int32 +} + +func (self *ConsumerSendMsgBackRequestHeader) FromMap(headerMap map[string]interface{}) { + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_consumer_list_by_group.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/get_consumer_list_by_group.go b/rocketmq-go/model/header/get_consumer_list_by_group.go new file mode 100644 index 0000000..e06e1fa --- /dev/null +++ b/rocketmq-go/model/header/get_consumer_list_by_group.go @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +type GetConsumerListByGroupRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` +} + +func (self *GetConsumerListByGroupRequestHeader) FromMap(headerMap map[string]interface{}) { + return +} + +type GetConsumerListByGroupResponseBody struct { + ConsumerIdList []string +} + +func (self *GetConsumerListByGroupResponseBody) FromMap(headerMap map[string]interface{}) { + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_consumer_running_info_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/get_consumer_running_info_request_header.go b/rocketmq-go/model/header/get_consumer_running_info_request_header.go new file mode 100644 index 0000000..5e7487f --- /dev/null +++ b/rocketmq-go/model/header/get_consumer_running_info_request_header.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +type GetConsumerRunningInfoRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` + ClientId string `json:"clientId"` + JstackEnable bool `json:"jstackEnable"` +} + +func (self *GetConsumerRunningInfoRequestHeader) FromMap(headerMap map[string]interface{}) { + self.ConsumerGroup = headerMap["consumerGroup"].(string) + self.ClientId = headerMap["clientId"].(string) + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_max_offset_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/get_max_offset_request_header.go b/rocketmq-go/model/header/get_max_offset_request_header.go new file mode 100644 index 0000000..6d4723e --- /dev/null +++ b/rocketmq-go/model/header/get_max_offset_request_header.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +type GetMaxOffsetRequestHeader struct { + Topic string `json:"topic"` + QueueId int32 `json:"queueId"` +} + +func (self *GetMaxOffsetRequestHeader) FromMap(headerMap map[string]interface{}) { + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_max_offset_response_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/get_max_offset_response_header.go b/rocketmq-go/model/header/get_max_offset_response_header.go new file mode 100644 index 0000000..eea6c2c --- /dev/null +++ b/rocketmq-go/model/header/get_max_offset_response_header.go @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + +type QueryOffsetResponseHeader struct { + Offset int64 `json:"offset"` +} + +func (self *QueryOffsetResponseHeader) FromMap(headerMap map[string]interface{}) { + self.Offset = util.StrToInt64WithDefaultValue(headerMap["offset"].(string), -1) + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_route_info_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/get_route_info_request_header.go b/rocketmq-go/model/header/get_route_info_request_header.go new file mode 100644 index 0000000..7c33c25 --- /dev/null +++ b/rocketmq-go/model/header/get_route_info_request_header.go @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +type GetRouteInfoRequestHeader struct { + Topic string `json:"topic"` +} + +func (self *GetRouteInfoRequestHeader) FromMap(headerMap map[string]interface{}) { + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/query_consumer_offset_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/query_consumer_offset_request_header.go b/rocketmq-go/model/header/query_consumer_offset_request_header.go new file mode 100644 index 0000000..ed455e7 --- /dev/null +++ b/rocketmq-go/model/header/query_consumer_offset_request_header.go @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +type QueryConsumerOffsetRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` + Topic string `json:"topic"` + QueueId int32 `json:"queueId"` +} + +func (self *QueryConsumerOffsetRequestHeader) FromMap(headerMap map[string]interface{}) { + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/reset_offset_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/reset_offset_request_header.go b/rocketmq-go/model/header/reset_offset_request_header.go new file mode 100644 index 0000000..642b600 --- /dev/null +++ b/rocketmq-go/model/header/reset_offset_request_header.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + "strconv" +) + +type ResetOffsetRequestHeader struct { + Topic string `json:"topic"` + Group string `json:"group"` + Timestamp int64 `json:"timestamp"` + IsForce bool `json:"isForce"` +} + +func (self *ResetOffsetRequestHeader) FromMap(headerMap map[string]interface{}) { + self.Group = headerMap["group"].(string) + self.Topic = headerMap["topic"].(string) + self.Timestamp = util.StrToInt64WithDefaultValue(headerMap["timestamp"].(string), -1) + self.IsForce, _ = strconv.ParseBool(headerMap["isForce"].(string)) + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/search_offset_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/search_offset_request_header.go b/rocketmq-go/model/header/search_offset_request_header.go new file mode 100644 index 0000000..5088eac --- /dev/null +++ b/rocketmq-go/model/header/search_offset_request_header.go @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +import () +import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + +type SearchOffsetRequestHeader struct { + Topic string `json:"topic"` + QueueId int32 `json:"queueId"` + Timestamp int64 `json:"timestamp"` +} + +func (self *SearchOffsetRequestHeader) FromMap(headerMap map[string]interface{}) { + self.Topic = headerMap["topic"].(string) + self.Topic = headerMap["queueId"].(string) + self.Timestamp = util.StrToInt64WithDefaultValue(headerMap["timestamp"].(string), -1) + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/update_consumer_offset_request_header.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/header/update_consumer_offset_request_header.go b/rocketmq-go/model/header/update_consumer_offset_request_header.go new file mode 100644 index 0000000..42612db --- /dev/null +++ b/rocketmq-go/model/header/update_consumer_offset_request_header.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package header + +import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + +type UpdateConsumerOffsetRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` + Topic string `json:"topic"` + QueueId int32 `json:"queueId"` + CommitOffset int64 `json:"commitOffset"` +} + +func (self *UpdateConsumerOffsetRequestHeader) FromMap(headerMap map[string]interface{}) { + self.ConsumerGroup = headerMap["consumerGroup"].(string) + self.QueueId = util.StrToInt32WithDefaultValue(util.ReadString(headerMap["queueId"]), 0) + self.CommitOffset = util.StrToInt64WithDefaultValue(headerMap["commitOffset"].(string), -1) + self.Topic = util.ReadString(headerMap["topic"]) + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/heart_beat.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/heart_beat.go b/rocketmq-go/model/heart_beat.go new file mode 100644 index 0000000..fc5eded --- /dev/null +++ b/rocketmq-go/model/heart_beat.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +type ConsumerData struct { + GroupName string + ConsumeType string + MessageModel string + ConsumeFromWhere string + SubscriptionDataSet []*SubscriptionData + UnitMode bool +} +type ProducerData struct { + GroupName string +} +type HeartbeatData struct { + ClientId string + ConsumerDataSet []*ConsumerData + ProducerDataSet []*ProducerData +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/message.go b/rocketmq-go/model/message.go new file mode 100644 index 0000000..0cb3d97 --- /dev/null +++ b/rocketmq-go/model/message.go @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + "strconv" + "strings" +) + +type Message struct { + Topic string + Flag int + Properties map[string]string + Body []byte +} + +func (self *Message) SetTag(tag string) { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + self.Properties[constant.PROPERTY_TAGS] = tag +} +func (self *Message) GetTag() (tag string) { + if self.Properties != nil { + tag = self.Properties[constant.PROPERTY_TAGS] + } + return +} + +func (self *Message) SetKeys(keys []string) { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + self.Properties[constant.PROPERTY_KEYS] = strings.Join(keys, KEY_SEPARATOR) +} + +func (self *Message) SetDelayTimeLevel(delayTimeLevel int) { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + self.Properties[constant.PROPERTY_DELAY_TIME_LEVEL] = util.IntToString(delayTimeLevel) +} +func (self *Message) SetWaitStoreMsgOK(waitStoreMsgOK bool) { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + self.Properties[constant.PROPERTY_WAIT_STORE_MSG_OK] = strconv.FormatBool(waitStoreMsgOK) +} +func (self *Message) GeneratorMsgUniqueKey() { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + if len(self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX]) > 0 { + return + } + self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX] = util.GeneratorMessageClientId() +} + +func (self *MessageExt) GetMsgUniqueKey() string { + if self.Properties != nil { + originMessageId := self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX] + if len(originMessageId) > 0 { + return originMessageId + } + } + return self.MsgId +} + +func (self *Message) SetOriginMessageId(messageId string) { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + self.Properties[constant.PROPERTY_ORIGIN_MESSAGE_ID] = messageId +} + +func (self *Message) SetRetryTopic(retryTopic string) { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + self.Properties[constant.PROPERTY_RETRY_TOPIC] = retryTopic +} +func (self *Message) SetReconsumeTime(reConsumeTime int) { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + self.Properties[constant.PROPERTY_RECONSUME_TIME] = util.IntToString(reConsumeTime) +} + +func (self *Message) GetReconsumeTimes() (reConsumeTime int) { + reConsumeTime = 0 + if self.Properties != nil { + reConsumeTimeStr := self.Properties[constant.PROPERTY_RECONSUME_TIME] + if len(reConsumeTimeStr) > 0 { + reConsumeTime = util.StrToIntWithDefaultValue(reConsumeTimeStr, 0) + } + } + return +} + +func (self *Message) SetMaxReconsumeTimes(maxConsumeTime int) { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + self.Properties[constant.PROPERTY_MAX_RECONSUME_TIMES] = util.IntToString(maxConsumeTime) +} + +func (self *Message) GetMaxReconsumeTimes() (maxConsumeTime int) { + maxConsumeTime = 0 + if self.Properties != nil { + reConsumeTimeStr := self.Properties[constant.PROPERTY_MAX_RECONSUME_TIMES] + if len(reConsumeTimeStr) > 0 { + maxConsumeTime = util.StrToIntWithDefaultValue(reConsumeTimeStr, 0) + } + } + return +} + +var KEY_SEPARATOR string = " " http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message/message.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/message/message.go b/rocketmq-go/model/message/message.go index 633446c..1dcd365 100644 --- a/rocketmq-go/model/message/message.go +++ b/rocketmq-go/model/message/message.go @@ -255,7 +255,7 @@ func (msg *Message) removeProperty(k, v string) string { delete(msg.properties, k) return v } - return nil + return "" } func (msg *Message) String() string { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_ext.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/message_ext.go b/rocketmq-go/model/message_ext.go new file mode 100644 index 0000000..9a3aacb --- /dev/null +++ b/rocketmq-go/model/message_ext.go @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + "math" + "strconv" + "time" +) + +type MessageExt struct { + *Message + QueueId int32 + StoreSize int32 + QueueOffset int64 + SysFlag int32 + BornTimestamp int64 + BornHost string + StoreTimestamp int64 + StoreHost string + MsgId string + CommitLogOffset int64 + BodyCRC int32 + ReconsumeTimes int32 + PreparedTransactionOffset int64 + + propertyConsumeStartTimestamp string // race condition +} + +func (self *MessageExt) GetOriginMessageId() string { + if self.Properties != nil { + originMessageId := self.Properties[constant.PROPERTY_ORIGIN_MESSAGE_ID] + if len(originMessageId) > 0 { + return originMessageId + } + } + return self.MsgId +} + +func (self *MessageExt) GetConsumeStartTime() int64 { + if len(self.propertyConsumeStartTimestamp) > 0 { + return util.StrToInt64WithDefaultValue(self.propertyConsumeStartTimestamp, -1) + } + return math.MaxInt64 +} + +func (self *MessageExt) SetConsumeStartTime() { + if self.Properties == nil { + self.Properties = make(map[string]string) + } + nowTime := strconv.FormatInt(time.Now().UnixNano()/1000000, 10) + self.Properties[constant.PROPERTY_KEYS] = nowTime + self.propertyConsumeStartTimestamp = nowTime + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_listener.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/message_listener.go b/rocketmq-go/model/message_listener.go new file mode 100644 index 0000000..7ad2054 --- /dev/null +++ b/rocketmq-go/model/message_listener.go @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +type MessageListener func(msgs []MessageExt) ConsumeConcurrentlyResult http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_queue.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/message_queue.go b/rocketmq-go/model/message_queue.go new file mode 100644 index 0000000..27d70a6 --- /dev/null +++ b/rocketmq-go/model/message_queue.go @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +type MessageQueue struct { + Topic string `json:"topic"` + BrokerName string `json:"brokerName"` + QueueId int32 `json:"queueId"` +} + +func (self *MessageQueue) clone() *MessageQueue { + no := new(MessageQueue) + no.Topic = self.Topic + no.QueueId = self.QueueId + no.BrokerName = self.BrokerName + return no +} + +type MessageQueues []*MessageQueue + +func (self MessageQueues) Less(i, j int) bool { + imq := self[i] + jmq := self[j] + + if imq.Topic < jmq.Topic { + return true + } else if imq.Topic < jmq.Topic { + return false + } + + if imq.BrokerName < jmq.BrokerName { + return true + } else if imq.BrokerName < jmq.BrokerName { + return false + } + + if imq.QueueId < jmq.QueueId { + return true + } else { + return false + } +} + +func (self MessageQueues) Swap(i, j int) { + self[i], self[j] = self[j], self[i] +} + +func (self MessageQueues) Len() int { + return len(self) +} + +func (self MessageQueue) Equals(messageQueue *MessageQueue) bool { + if self.QueueId != messageQueue.QueueId { + return false + } + if self.Topic != messageQueue.Topic { + return false + } + if self.BrokerName != messageQueue.BrokerName { + return false + } + return true +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/process_queue.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/process_queue.go b/rocketmq-go/model/process_queue.go new file mode 100644 index 0000000..285cbda --- /dev/null +++ b/rocketmq-go/model/process_queue.go @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +import ( + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + "github.com/emirpasic/gods/maps/treemap" + "github.com/golang/glog" + "sync" + "time" +) + +type ProcessQueue struct { + msgTreeMap *treemap.Map // int | MessageExt + msgCount int + lockTreeMap sync.RWMutex + locked bool + lastPullTimestamp time.Time + lastConsumeTimestamp time.Time + lastLockTimestamp time.Time + lockConsume sync.RWMutex + consuming bool + queueOffsetMax int64 + dropped bool + msgAccCnt int64 //accumulation message count + tryUnlockTimes int64 + msgTreeMapToBeConsume *treemap.Map +} + +func NewProcessQueue() (processQueue *ProcessQueue) { + processQueue = new(ProcessQueue) + processQueue.dropped = false + processQueue.msgTreeMap = treemap.NewWithIntComparator() + processQueue.msgTreeMapToBeConsume = treemap.NewWithIntComparator() + + return +} +func (self *ProcessQueue) GetMsgCount() int { + defer self.lockTreeMap.Unlock() + self.lockTreeMap.Lock() + return self.msgCount +} + +func (self *ProcessQueue) Clear() { + defer self.lockTreeMap.Unlock() + self.lockTreeMap.Lock() + self.SetDrop(true) + self.msgTreeMap.Clear() + self.msgCount = 0 + self.queueOffsetMax = 0 + +} + +func (self *ProcessQueue) ChangeToProcessQueueInfo() (processQueueInfo ProcessQueueInfo) { + defer self.lockTreeMap.Unlock() + self.lockTreeMap.Lock() + processQueueInfo = ProcessQueueInfo{} + minOffset := -1 + maxOffset := -1 + minKey, _ := self.msgTreeMap.Min() + if minKey != nil { + minOffset = minKey.(int) + } + maxKey, _ := self.msgTreeMap.Max() + if maxKey != nil { + maxOffset = maxKey.(int) + } + processQueueInfo.CachedMsgCount = int32(self.msgCount) + processQueueInfo.CachedMsgMinOffset = int64(maxOffset) + processQueueInfo.CachedMsgMaxOffset = int64(minOffset) + //processQueueInfo.CommitOffset = -123 // todo + processQueueInfo.Droped = self.dropped + processQueueInfo.LastConsumeTimestamp = self.lastConsumeTimestamp.UnixNano() + processQueueInfo.LastPullTimestamp = self.lastPullTimestamp.UnixNano() + //processQueueInfo. + + return +} + +func (self *ProcessQueue) DeleteExpireMsg(queueOffset int) { + defer self.lockTreeMap.Unlock() + self.lockTreeMap.Lock() + key, _ := self.msgTreeMap.Min() + if key == nil { + return + } + offset := key.(int) + glog.Infof("look min key and offset %d %s", offset, queueOffset) + if queueOffset == offset { + self.msgTreeMap.Remove(queueOffset) + self.msgCount = self.msgTreeMap.Size() + } +} + +func (self *ProcessQueue) GetMinMessageInTree() (offset int, messagePoint *MessageExt) { + defer self.lockTreeMap.Unlock() + self.lockTreeMap.Lock() + key, value := self.msgTreeMap.Min() + if key == nil || value == nil { + return + } + offset = key.(int) + + message := value.(MessageExt) + messagePoint = &message + return +} + +func (self *ProcessQueue) SetDrop(drop bool) { + self.dropped = drop +} +func (self *ProcessQueue) IsDropped() bool { + return self.dropped +} +func (self *ProcessQueue) GetMaxSpan() int { + defer self.lockTreeMap.Unlock() + self.lockTreeMap.Lock() + if self.msgTreeMap.Empty() { + return 0 + } + minKey, _ := self.msgTreeMap.Min() + minOffset := minKey.(int) + maxKey, _ := self.msgTreeMap.Max() + maxOffset := maxKey.(int) + return maxOffset - minOffset +} + +func (self *ProcessQueue) RemoveMessage(msgs []MessageExt) (offset int64) { + now := time.Now() + offset = -1 + defer self.lockTreeMap.Unlock() + self.lockTreeMap.Lock() + self.lastConsumeTimestamp = now + if self.msgCount > 0 { + maxKey, _ := self.msgTreeMap.Max() + offset = int64(maxKey.(int)) + 1 + for _, msg := range msgs { + self.msgTreeMap.Remove(int(msg.QueueOffset)) + } + self.msgCount = self.msgTreeMap.Size() + if self.msgCount > 0 { + minKey, _ := self.msgTreeMap.Min() + offset = int64(minKey.(int)) + } + } + return +} + +func (self *ProcessQueue) PutMessage(msgs []MessageExt) (dispatchToConsume bool) { + dispatchToConsume = false + msgsLen := len(msgs) + if msgsLen == 0 { + return + } + defer self.lockTreeMap.Unlock() + self.lockTreeMap.Lock() + + for _, msg := range msgs { + self.msgTreeMap.Put(int(msg.QueueOffset), msg) + + } + self.msgCount = self.msgTreeMap.Size() + maxOffset, _ := self.msgTreeMap.Max() + self.queueOffsetMax = int64(maxOffset.(int)) + if self.msgCount > 0 && !self.consuming { + dispatchToConsume = true + self.consuming = true + } + lastMsg := msgs[msgsLen-1] + remoteMaxOffset := util.StrToInt64WithDefaultValue(lastMsg.Properties[constant.PROPERTY_MAX_OFFSET], -1) + if remoteMaxOffset > 0 { + accTotal := remoteMaxOffset - lastMsg.QueueOffset + if accTotal > 0 { + self.msgAccCnt = accTotal + } + } + return +} + +//func (self *ProcessQueue) TakeMessages(batchSize int) (messageToConsumeList []MessageExt) { +// defer self.lockTreeMap.Unlock() +// self.lockTreeMap.Lock() +// self.lastConsumeTimestamp = time.Now() +// it := self.msgTreeMap.Iterator() +// nowIndex := 0 +// for it.Next() { +// offset, message := it.Key(), it.Value() +// if (nowIndex >= batchSize) { +// break +// } +// self.msgTreeMap.Remove(offset) +// self.msgTreeMapToBeConsume.Put(offset, message) +// //messageToConsumeList = append(messageToConsumeList, message) +// } +// if (len(messageToConsumeList) == 0) { +// self.consuming = false +// } +// return +//} + +/** +# +public final static long RebalanceLockMaxLiveTime =Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000")); +public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000")); +#å¹¶åæ¶è´¹è¿æç + case CONSUME_PASSIVELY: + pq.setDropped(true); + if (this.removeUnnecessaryMessageQueue(mq, pq)) { + it.remove(); + changed = true; + log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", + consumerGroup, mq); + } + break; +private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000")); +private final Logger log = ClientLogger.getLog(); +private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); + +private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); +private final AtomicLong msgCount = new AtomicLong(); +private final Lock lockConsume = new ReentrantLock(); +private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>(); +private final AtomicLong tryUnlockTimes = new AtomicLong(0); +private volatile long queueOffsetMax = 0L; +private volatile boolean dropped = false; +private volatile long lastPullTimestamp = System.currentTimeMillis(); +private volatile long lastConsumeTimestamp = System.currentTimeMillis(); +private volatile boolean locked = false; +private volatile long lastLockTimestamp = System.currentTimeMillis(); +private volatile boolean consuming = false; +private volatile long msgAccCnt = 0; + + public boolean isLockExpired() { + boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime; + return result; + } + + + public boolean isPullExpired() { + boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime; + return result; + } + +param pushConsumer +cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { +if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { +return; +} + +int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; +for (int i = 0; i < loop; i++) { +MessageExt msg = null; +try { +this.lockTreeMap.readLock().lockInterruptibly(); +try { +if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { +msg = msgTreeMap.firstEntry().getValue(); +} else { + +break; +} +} finally { +this.lockTreeMap.readLock().unlock(); +} +} catch (InterruptedException e) { +log.error("getExpiredMsg exception", e); +} + +try { + +pushConsumer.sendMessageBack(msg, 3); +log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); +try { +this.lockTreeMap.writeLock().lockInterruptibly(); +try { +if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { +try { +msgTreeMap.remove(msgTreeMap.firstKey()); +} catch (Exception e) { +log.error("send expired msg exception", e); +} +} +} finally { +this.lockTreeMap.writeLock().unlock(); +} +} catch (InterruptedException e) { +log.error("getExpiredMsg exception", e); +} +} catch (Exception e) { +log.error("send expired msg exception", e); +} +} +} + + +public boolean putMessage(final List<MessageExt> msgs) { +boolean dispatchToConsume = false; +try { +this.lockTreeMap.writeLock().lockInterruptibly(); +try { +int validMsgCnt = 0; +for (MessageExt msg : msgs) { +MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); +if (null == old) { +validMsgCnt++; +this.queueOffsetMax = msg.getQueueOffset(); +} +} +msgCount.addAndGet(validMsgCnt); + +if (!msgTreeMap.isEmpty() && !this.consuming) { +dispatchToConsume = true; +this.consuming = true; +} + +if (!msgs.isEmpty()) { +MessageExt messageExt = msgs.get(msgs.size() - 1); +String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); +if (property != null) { +long accTotal = Long.parseLong(property) - messageExt.getQueueOffset(); +if (accTotal > 0) { +this.msgAccCnt = accTotal; +} +} +} +} finally { +this.lockTreeMap.writeLock().unlock(); +} +} catch (InterruptedException e) { +log.error("putMessage exception", e); +} + +return dispatchToConsume; +} + + +public long getMaxSpan() { +try { +this.lockTreeMap.readLock().lockInterruptibly(); +try { +if (!this.msgTreeMap.isEmpty()) { +return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey(); +} +} finally { +this.lockTreeMap.readLock().unlock(); +} +} catch (InterruptedException e) { +log.error("getMaxSpan exception", e); +} + +return 0; +} + + +public long removeMessage(final List<MessageExt> msgs) { //treeMapæ¯ç»´æ¤äºæ²¡ææ¶è´¹ç 为äºå¤çè¿æä½¿ç¨ +long result = -1; +final long now = System.currentTimeMillis(); +try { +this.lockTreeMap.writeLock().lockInterruptibly(); +this.lastConsumeTimestamp = now; +try { +if (!msgTreeMap.isEmpty()) { +result = this.queueOffsetMax + 1; +int removedCnt = 0; +for (MessageExt msg : msgs) { +MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); +if (prev != null) { +removedCnt--; +} +} +msgCount.addAndGet(removedCnt); + +if (!msgTreeMap.isEmpty()) { +result = msgTreeMap.firstKey(); +} +} +} finally { +this.lockTreeMap.writeLock().unlock(); +} +} catch (Throwable t) { +log.error("removeMessage exception", t); +} + +return result; +} + + +public TreeMap<Long, MessageExt> getMsgTreeMap() { +return msgTreeMap; +} + + +public AtomicLong getMsgCount() { +return msgCount; +} + + +public boolean isDropped() { +return dropped; +} + + +public void setDropped(boolean dropped) { +this.dropped = dropped; +} + +public boolean isLocked() { +return locked; +} + +public void setLocked(boolean locked) { +this.locked = locked; +} + +public void rollback() { +try { +this.lockTreeMap.writeLock().lockInterruptibly(); +try { +this.msgTreeMap.putAll(this.msgTreeMapTemp); +this.msgTreeMapTemp.clear(); +} finally { +this.lockTreeMap.writeLock().unlock(); +} +} catch (InterruptedException e) { +log.error("rollback exception", e); +} +} + + +public long commit() { +try { +this.lockTreeMap.writeLock().lockInterruptibly(); +try { +Long offset = this.msgTreeMapTemp.lastKey(); +msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1)); +this.msgTreeMapTemp.clear(); +if (offset != null) { +return offset + 1; +} +} finally { +this.lockTreeMap.writeLock().unlock(); +} +} catch (InterruptedException e) { +log.error("commit exception", e); +} + +return -1; +} + + +public void makeMessageToCosumeAgain(List<MessageExt> msgs) { +try { +this.lockTreeMap.writeLock().lockInterruptibly(); +try { +for (MessageExt msg : msgs) { +this.msgTreeMapTemp.remove(msg.getQueueOffset()); +this.msgTreeMap.put(msg.getQueueOffset(), msg); +} +} finally { +this.lockTreeMap.writeLock().unlock(); +} +} catch (InterruptedException e) { +log.error("makeMessageToCosumeAgain exception", e); +} +} + + +public List<MessageExt> takeMessags(final int batchSize) { +List<MessageExt> result = new ArrayList<MessageExt>(batchSize); +final long now = System.currentTimeMillis(); +try { +this.lockTreeMap.writeLock().lockInterruptibly(); +this.lastConsumeTimestamp = now; +try { +if (!this.msgTreeMap.isEmpty()) { +for (int i = 0; i < batchSize; i++) { +Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry(); +if (entry != null) { +result.add(entry.getValue()); +msgTreeMapTemp.put(entry.getKey(), entry.getValue()); +} else { +break; +} +} +} + +if (result.isEmpty()) { +consuming = false; +} +} finally { +this.lockTreeMap.writeLock().unlock(); +} +} catch (InterruptedException e) { +log.error("take Messages exception", e); +} + +return result; +} + + +public boolean hasTempMessage() { +try { +this.lockTreeMap.readLock().lockInterruptibly(); +try { +return !this.msgTreeMap.isEmpty(); +} finally { +this.lockTreeMap.readLock().unlock(); +} +} catch (InterruptedException e) { +} + +return true; +} + + +public void clear() { +try { +this.lockTreeMap.writeLock().lockInterruptibly(); +try { +this.msgTreeMap.clear(); +this.msgTreeMapTemp.clear(); +this.msgCount.set(0); +this.queueOffsetMax = 0L; +} finally { +this.lockTreeMap.writeLock().unlock(); +} +} catch (InterruptedException e) { +log.error("rollback exception", e); +} +} + + + + +public void setLastLockTimestamp(long lastLockTimestamp) { +this.lastLockTimestamp = lastLockTimestamp; +} + + +public Lock getLockConsume() { +return lockConsume; +} + + + + +public void setLastPullTimestamp(long lastPullTimestamp) { +this.lastPullTimestamp = lastPullTimestamp; +} + + +public long getMsgAccCnt() { +return msgAccCnt; +} + + + +public long getTryUnlockTimes() { +return this.tryUnlockTimes.get(); +} + + +public void incTryUnlockTimes() { +this.tryUnlockTimes.incrementAndGet(); +} + + +public void fillProcessQueueInfo(final ProcessQueueInfo info) { +try { +this.lockTreeMap.readLock().lockInterruptibly(); + +if (!this.msgTreeMap.isEmpty()) { +info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); +info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); +info.setCachedMsgCount(this.msgTreeMap.size()); +} + +if (!this.msgTreeMapTemp.isEmpty()) { +info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey()); +info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey()); +info.setTransactionMsgCount(this.msgTreeMapTemp.size()); +} + +info.setLocked(this.locked); +info.setTryUnlockTimes(this.tryUnlockTimes.get()); +info.setLastLockTimestamp(this.lastLockTimestamp); + +info.setDroped(this.dropped); +info.setLastPullTimestamp(this.lastPullTimestamp); +info.setLastConsumeTimestamp(this.lastConsumeTimestamp); +} catch (Exception e) { +} finally { +this.lockTreeMap.readLock().unlock(); +} +} + + + +*/ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/process_queue_info.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/process_queue_info.go b/rocketmq-go/model/process_queue_info.go new file mode 100644 index 0000000..6bd71bd --- /dev/null +++ b/rocketmq-go/model/process_queue_info.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +type ProcessQueueInfo struct { + CommitOffset int64 `json:"commitOffset"` + + CachedMsgMinOffset int64 `json:"cachedMsgMinOffset"` + CachedMsgMaxOffset int64 `json:"cachedMsgMaxOffset"` + CachedMsgCount int32 `json:"cachedMsgCount"` + + TransactionMsgMinOffset int64 `json:"transactionMsgMinOffset"` + TransactionMsgMaxOffset int64 `json:"transactionMsgMaxOffset"` + TransactionMsgCount int32 `json:"transactionMsgCount"` + + Locked bool `json:"locked"` + TryUnlockTimes int64 `json:"tryUnlockTimes"` + LastLockTimestamp int64 `json:"lastLockTimestamp"` + + Droped bool `json:"droped"` + LastPullTimestamp int64 `json:"lastPullTimestamp"` + LastConsumeTimestamp int64 `json:"lastConsumeTimestamp"` +} + +//func (self ProcessQueueInfo) BuildFromProcessQueue(processQueue ProcessQueue) (processQueueInfo ProcessQueueInfo) { +// processQueueInfo = ProcessQueueInfo{} +// //processQueueInfo.CommitOffset = +// processQueueInfo.CachedMsgCount = processQueue.GetMsgCount() +// processQueueInfo.CachedMsgCount +// return +//} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/pull_request.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/pull_request.go b/rocketmq-go/model/pull_request.go new file mode 100644 index 0000000..bc1a46f --- /dev/null +++ b/rocketmq-go/model/pull_request.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +type PullRequest struct { + ConsumerGroup string + MessageQueue *MessageQueue + ProcessQueue *ProcessQueue + NextOffset int64 +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/reset_offset_body.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/model/reset_offset_body.go b/rocketmq-go/model/reset_offset_body.go new file mode 100644 index 0000000..1a0221d --- /dev/null +++ b/rocketmq-go/model/reset_offset_body.go @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package model + +import ( + "encoding/json" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + "github.com/golang/glog" +) + +type ResetOffsetBody struct { + OffsetTable map[MessageQueue]int64 `json:"offsetTable"` +} + +func (self *ResetOffsetBody) Decode(data []byte) (err error) { + self.OffsetTable = map[MessageQueue]int64{} + var kvMap map[string]string + kvMap, err = util.GetKvStringMap(string(data)) + if err != nil { + return + } + glog.Info(kvMap) + kvMap, err = util.GetKvStringMap(kvMap["\"offsetTable\""]) + if err != nil { + return + } + for k, v := range kvMap { + messageQueue := &MessageQueue{} + var offset int64 + err = json.Unmarshal([]byte(k), messageQueue) + if err != nil { + return + } + offset, err = util.StrToInt64(v) + if err != nil { + return + } + self.OffsetTable[*messageQueue] = offset + } + return +}
