ShannonDing closed pull request #20: Stable test URL: https://github.com/apache/rocketmq-client-go/pull/20
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/benchmark/consumer.go b/benchmark/consumer.go index 9e1cd77..1a893c9 100644 --- a/benchmark/consumer.go +++ b/benchmark/consumer.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( diff --git a/benchmark/main.go b/benchmark/main.go index d268d72..080a948 100644 --- a/benchmark/main.go +++ b/benchmark/main.go @@ -1,9 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( "fmt" "os" - "strings" ) type command interface { @@ -12,16 +28,9 @@ type command interface { } var ( - cmds = map[string]command{} - longText = "" - longTextLen = 0 + cmds = map[string]command{} ) -func init() { - longText = strings.Repeat("0123456789", 100) - longTextLen = len(longText) -} - func registerCommand(name string, cmd command) { if cmd == nil { panic("empty command") diff --git a/benchmark/message.go b/benchmark/message.go new file mode 100644 index 0000000..d5690fe --- /dev/null +++ b/benchmark/message.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "strings" + +var ( + longText = "" + longTextLen = 0 +) + +func init() { + longText = strings.Repeat("0123456789", 100) + longTextLen = len(longText) +} + +func buildMsg(size int) string { + return longText[:size] +} diff --git a/benchmark/producer.go b/benchmark/producer.go index 7b21356..e183269 100644 --- a/benchmark/producer.go +++ b/benchmark/producer.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( @@ -126,10 +143,15 @@ AGAIN: } now := time.Now() - r := p.SendMessageSync(&rocketmq.Message{ - Topic: bp.topic, Body: longText[:bp.bodySize], + r, err := p.SendMessageSync(&rocketmq.Message{ + Topic: bp.topic, Body: buildMsg(bp.bodySize), }) + if err != nil { + fmt.Printf("send message sync error:%s", err) + goto AGAIN + } + if r.Status == rocketmq.SendOK { atomic.AddInt64(&stati.receiveResponseSuccessCount, 1) atomic.AddInt64(&stati.sendRequestSuccessCount, 1) @@ -249,7 +271,3 @@ func (bp *producer) run(args []string) { func (bp *producer) usage() { bp.flags.Usage() } - -func (bp *producer) buildMsg() string { - return longText[:bp.bodySize] -} diff --git a/benchmark/stable.go b/benchmark/stable.go new file mode 100644 index 0000000..6c7a12c --- /dev/null +++ b/benchmark/stable.go @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "errors" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + rocketmq "github.com/apache/rocketmq-client-go/core" +) + +type stableTest struct { + nameSrv string + topic string + groupID string + opIntervalSec int + testMin int + + op func() + + flags *flag.FlagSet +} + +func (st *stableTest) buildFlags(name string) { + flags := flag.NewFlagSet(name, flag.ExitOnError) + flags.StringVar(&st.topic, "t", "stable-test", "topic name") + flags.StringVar(&st.nameSrv, "n", "", "nameserver address") + flags.StringVar(&st.groupID, "g", "stable-test", "group id") + flags.IntVar(&st.testMin, "m", 10, "test minutes") + flags.IntVar(&st.opIntervalSec, "s", 1, "operation interval[produce/consume]") + + st.flags = flags +} + +func (st *stableTest) checkFlag() error { + if st.topic == "" { + return errors.New("empty topic") + } + + if st.nameSrv == "" { + return errors.New("empty namesrv") + } + + if st.groupID == "" { + return errors.New("empty group id") + } + + if st.testMin <= 0 { + return errors.New("test miniutes must be positive integer") + } + + if st.opIntervalSec <= 0 { + return errors.New("operation interval must be positive integer") + } + + return nil +} + +func (st *stableTest) run() { + opTicker := time.NewTicker(time.Duration(st.opIntervalSec) * time.Second) + closeChan := time.Tick(time.Duration(st.testMin) * time.Minute) + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + for { + select { + case <-signalChan: + opTicker.Stop() + fmt.Println("test over") + return + case <-closeChan: + opTicker.Stop() + fmt.Println("test over") + return + case <-opTicker.C: + st.op() + } + } +} + +type stableTestProducer struct { + *stableTest + bodySize int + + p rocketmq.Producer +} + +func (stp *stableTestProducer) buildFlags(name string) { + stp.stableTest.buildFlags(name) + stp.flags.IntVar(&stp.bodySize, "b", 32, "body size") +} + +func (stp *stableTestProducer) checkFlag() error { + err := stp.stableTest.checkFlag() + if err != nil { + return err + } + if stp.bodySize <= 0 { + return errors.New("message body size must be positive integer") + } + + return nil +} + +func (stp *stableTestProducer) usage() { + stp.flags.Usage() +} + +func (stp *stableTestProducer) run(args []string) { + err := stp.flags.Parse(args) + if err != nil { + fmt.Printf("parse args:%v, error:%s\n", args, err) + stp.usage() + return + } + + err = stp.checkFlag() + if err != nil { + fmt.Println(err) + stp.usage() + return + } + + p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{ + ClientConfig: rocketmq.ClientConfig{GroupID: stp.groupID, NameServer: stp.nameSrv}, + }) + if err != nil { + fmt.Printf("new producer error:%s\n", err) + return + } + + err = p.Start() + if err != nil { + fmt.Printf("start producer error:%s\n", err) + return + } + defer p.Shutdown() + + stp.p = p + stp.stableTest.run() +} + +func (stp *stableTestProducer) sendMessage() { + r, err := stp.p.SendMessageSync(&rocketmq.Message{Topic: stp.topic, Body: buildMsg(stp.bodySize)}) + if err == nil { + fmt.Printf("send result:%+v\n", r) + return + } + fmt.Printf("send message error:%s", err) +} + +type stableTestConsumer struct { + *stableTest + expression string + + c rocketmq.PullConsumer + offsets map[int]int64 +} + +func (stc *stableTestConsumer) buildFlags(name string) { + stc.stableTest.buildFlags(name) + stc.flags.StringVar(&stc.expression, "e", "*", "expression") +} + +func (stc *stableTestConsumer) checkFlag() error { + err := stc.stableTest.checkFlag() + if err != nil { + return err + } + + if stc.expression == "" { + return errors.New("empty expression") + } + return nil +} + +func (stc *stableTestConsumer) usage() { + stc.flags.Usage() +} + +func (stc *stableTestConsumer) run(args []string) { + err := stc.flags.Parse(args) + if err != nil { + fmt.Printf("parse args:%v, error:%s\n", args, err) + stc.usage() + return + } + + err = stc.checkFlag() + if err != nil { + stc.usage() + fmt.Printf("%s\n", err) + return + } + + c, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{ + ClientConfig: rocketmq.ClientConfig{GroupID: stc.groupID, NameServer: stc.nameSrv}, + }) + if err != nil { + fmt.Printf("new pull consumer error:%s\n", err) + return + } + + err = c.Start() + if err != nil { + fmt.Printf("start consumer error:%s\n", err) + return + } + defer c.Shutdown() + + stc.c = c + stc.stableTest.run() +} + +func (stc *stableTestConsumer) pullMessage() { + mqs := stc.c.FetchSubscriptionMessageQueues(stc.topic) + + for _, mq := range mqs { + offset := stc.offsets[mq.ID] + pr := stc.c.Pull(mq, stc.expression, offset, 32) + fmt.Printf("pull from %s, offset:%d, count:%+v\n", mq.String(), offset, len(pr.Messages)) + + switch pr.Status { + case rocketmq.PullNoNewMsg: + stc.offsets[mq.ID] = 0 // pull from the begin + case rocketmq.PullFound: + fallthrough + case rocketmq.PullNoMatchedMsg: + fallthrough + case rocketmq.PullOffsetIllegal: + stc.offsets[mq.ID] = pr.NextBeginOffset + case rocketmq.PullBrokerTimeout: + fmt.Println("broker timeout occur") + } + } +} + +func init() { + // producer + name := "stableTestProducer" + p := &stableTestProducer{stableTest: &stableTest{}} + p.buildFlags(name) + p.op = p.sendMessage + registerCommand(name, p) + + // consumer + name = "stableTestConsumer" + c := &stableTestConsumer{stableTest: &stableTest{}, offsets: map[int]int64{}} + c.buildFlags(name) + c.op = c.pullMessage + registerCommand(name, c) +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
