This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b3ede3a [ISSUE #935]fix consumer option validate (#936)
b3ede3a is described below
commit b3ede3a7f3626eb262d9c7915ffcff9d52163896
Author: 0daypwn <[email protected]>
AuthorDate: Fri Oct 14 16:37:54 2022 +0800
[ISSUE #935]fix consumer option validate (#936)
Co-authored-by: wuxb02 <[email protected]>
---
consumer/pull_consumer.go | 32 ++++++++++++++++++++++++++------
consumer/push_consumer.go | 45 ++++++++++++++++++++++++++++++++-------------
internal/validators.go | 17 ++++++++++-------
3 files changed, 68 insertions(+), 26 deletions(-)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index a7e0ec5..a53d0b8 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -142,12 +142,18 @@ func (pc *defaultPullConsumer) Unsubscribe(topic string)
error {
}
func (pc *defaultPullConsumer) Start() error {
- atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
-
var err error
pc.once.Do(func() {
- consumerGroupWithNs := utils.WrapNamespace(pc.option.Namespace,
pc.consumerGroup)
- err =
pc.defaultConsumer.client.RegisterConsumer(consumerGroupWithNs, pc)
+ err = pc.validate()
+ if err != nil {
+ rlog.Error("the consumer group option validate fail",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
+ err = errors.Wrap(err, "the consumer group option
validate fail")
+ return
+ }
+ err =
pc.defaultConsumer.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
rlog.Error("defaultPullConsumer the consumer group has
been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
@@ -159,7 +165,7 @@ func (pc *defaultPullConsumer) Start() error {
if err != nil {
return
}
-
+ atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
go func() {
for {
select {
@@ -176,7 +182,9 @@ func (pc *defaultPullConsumer) Start() error {
}
}()
})
-
+ if err != nil {
+ return err
+ }
pc.client.UpdateTopicRouteInfo()
_, exist := pc.topicSubscribeInfoTable.Load(pc.topic)
if !exist {
@@ -844,3 +852,15 @@ func (pc *defaultPullConsumer) consumeMessageCurrently(pq
*processQueue, mq *pri
case pc.consumeRequestCache <- cr:
}
}
+
+func (pc *defaultPullConsumer) validate() error {
+ if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
+ return err
+ }
+
+ if pc.consumerGroup == internal.DefaultConsumerGroup {
+ return fmt.Errorf("consumerGroup can't equal [%s], please
specify another one", internal.DefaultConsumerGroup)
+ }
+
+ return nil
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d9d0a50..5f00bac 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -139,7 +139,15 @@ func (pc *pushConsumer) Start() error {
"unitMode": pc.unitMode,
})
atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
- pc.validate()
+ err = pc.validate()
+ if err != nil {
+ rlog.Error("the consumer group option validate fail",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
+ err = errors.Wrap(err, "the consumer group option
validate fail")
+ return
+ }
err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
@@ -498,23 +506,25 @@ func (pc *pushConsumer) messageQueueChanged(topic string,
mqAll, mqDivided []*pr
pc.client.SendHeartbeatToAllBrokerWithLock()
}
-func (pc *pushConsumer) validate() {
- internal.ValidateGroup(pc.consumerGroup)
+func (pc *pushConsumer) validate() error {
+ if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
+ return err
+ }
if pc.consumerGroup == internal.DefaultConsumerGroup {
// TODO FQA
- rlog.Error(fmt.Sprintf("consumerGroup can't equal [%s], please
specify another one.", internal.DefaultConsumerGroup), nil)
+ return fmt.Errorf("consumerGroup can't equal [%s], please
specify another one", internal.DefaultConsumerGroup)
}
if len(pc.subscribedTopic) == 0 {
- rlog.Error("number of subscribed topics is 0.", nil)
+ return errors.New("number of subscribed topics is 0.")
}
if pc.option.ConsumeConcurrentlyMaxSpan < 1 ||
pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
pc.option.ConsumeConcurrentlyMaxSpan = 1000
} else {
- rlog.Error("option.ConsumeConcurrentlyMaxSpan out of
range [1, 65535]", nil)
+ return errors.New("option.ConsumeConcurrentlyMaxSpan
out of range [1, 65535]")
}
}
@@ -522,7 +532,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdForQueue == 0 {
pc.option.PullThresholdForQueue = 1024
} else {
- rlog.Error("option.PullThresholdForQueue out of range
[1, 65535]", nil)
+ return errors.New("option.PullThresholdForQueue out of
range [1, 65535]")
}
}
@@ -530,7 +540,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdForTopic == 0 {
pc.option.PullThresholdForTopic = 102400
} else {
- rlog.Error("option.PullThresholdForTopic out of range
[1, 6553500]", nil)
+ return errors.New("option.PullThresholdForTopic out of
range [1, 6553500]")
}
}
@@ -538,7 +548,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdSizeForQueue == 0 {
pc.option.PullThresholdSizeForQueue = 512
} else {
- rlog.Error("option.PullThresholdSizeForQueue out of
range [1, 1024]", nil)
+ return errors.New("option.PullThresholdSizeForQueue out
of range [1, 1024]")
}
}
@@ -546,19 +556,19 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdSizeForTopic == 0 {
pc.option.PullThresholdSizeForTopic = 51200
} else {
- rlog.Error("option.PullThresholdSizeForTopic out of
range [1, 102400]", nil)
+ return errors.New("option.PullThresholdSizeForTopic out
of range [1, 102400]")
}
}
if pc.option.PullInterval < 0 || pc.option.PullInterval >
65535*time.Millisecond {
- rlog.Error("option.PullInterval out of range [0, 65535]", nil)
+ return errors.New("option.PullInterval out of range [0, 65535]")
}
if pc.option.ConsumeMessageBatchMaxSize < 1 ||
pc.option.ConsumeMessageBatchMaxSize > 1024 {
if pc.option.ConsumeMessageBatchMaxSize == 0 {
pc.option.ConsumeMessageBatchMaxSize = 1
} else {
- rlog.Error("option.ConsumeMessageBatchMaxSize out of
range [1, 1024]", nil)
+ return errors.New("option.ConsumeMessageBatchMaxSize
out of range [1, 1024]")
}
}
@@ -566,9 +576,18 @@ func (pc *pushConsumer) validate() {
if pc.option.PullBatchSize == 0 {
pc.option.PullBatchSize = 32
} else {
- rlog.Error("option.PullBatchSize out of range [1,
1024]", nil)
+ return errors.New("option.PullBatchSize out of range
[1, 1024]")
+ }
+ }
+
+ if pc.option.ConsumeGoroutineNums < 1 || pc.option.ConsumeGoroutineNums
> 1000 {
+ if pc.option.ConsumeGoroutineNums == 0 {
+ pc.option.ConsumeGoroutineNums = 20
+ } else {
+ return errors.New("option.ConsumeGoroutineNums out of
range [1, 1000]")
}
}
+ return nil
}
func (pc *pushConsumer) pullMessage(request *PullRequest) {
diff --git a/internal/validators.go b/internal/validators.go
index ac51db2..61bd45e 100644
--- a/internal/validators.go
+++ b/internal/validators.go
@@ -18,9 +18,9 @@ limitations under the License.
package internal
import (
+ "errors"
+ "fmt"
"regexp"
-
- "github.com/apache/rocketmq-client-go/v2/rlog"
)
const (
@@ -29,15 +29,18 @@ const (
)
var (
- _Pattern, _ = regexp.Compile(_ValidPattern)
+ _Pattern = regexp.MustCompile(_ValidPattern)
)
-func ValidateGroup(group string) {
+func ValidateGroup(group string) error {
if group == "" {
- rlog.Fatal("consumerGroup is empty", nil)
+ return errors.New("consumerGroup is empty")
}
-
if len(group) > _CharacterMaxLength {
- rlog.Fatal("the specified group is longer than group max length
255.", nil)
+ return errors.New("the specified group is longer than group max
length 255")
+ }
+ if !_Pattern.MatchString(group) {
+ return fmt.Errorf("the specified group[%s] contains illegal
characters, allowing only %s", group, _ValidPattern)
}
+ return nil
}