wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r774703591



##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,342 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "context"
+       "fmt"
+       "strconv"
+       "sync"
+       "time"
+
+       errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+       "github.com/apache/rocketmq-client-go/v2/internal"
+       "github.com/apache/rocketmq-client-go/v2/internal/remote"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+       "github.com/pkg/errors"
+)
+
+// ManualPullConsumer is a low-level consumer, which operates based on 
MessageQueue.
+// Users should maintain information such as offset by themselves
+type ManualPullConsumer interface {
+       // PullFromQueue return messages according to specified queue with 
offset
+       PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset 
int64, numbers int) (*primitive.PullResult, error)
+
+       // GetMessageQueues return queues of the topic
+       GetMessageQueues(ctx context.Context, topic string) 
([]*primitive.MessageQueue, error)
+
+       // CommittedOffset return the offset of mq in groupName, if mq not 
exist, -1 will be return
+       CommittedOffset(ctx context.Context, groupName string, mq 
*primitive.MessageQueue) (int64, error)
+
+       // Seek let consume position to the offset, this api can be used to 
reset offset and commit offset
+       Seek(ctx context.Context, groupName string, mq *primitive.MessageQueue, 
offset int64) error
+
+       // Lookup return offset according to timestamp(ms), the maximum offset 
that born time less than timestamp will be return.
+       // If timestamp less than any message's born time, the earliest offset 
will be returned
+       // If timestamp great than any message's born time, the latest offset 
will be returned
+       Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp 
int64) (int64, error)
+
+       // Shutdown the ManualPullConsumer, clean up internal resources
+       Shutdown() error
+}
+
+type defaultManualPullConsumer struct {
+       namesrv                internal.Namesrvs
+       option                 consumerOptions
+       client                 internal.RMQClient
+       interceptor            primitive.Interceptor
+       pullFromWhichNodeTable sync.Map
+       shutdownOnce           sync.Once
+}
+
+// NewManualPullConsumer creates and initializes a new ManualPullConsumer.
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, 
error) {
+       defaultOpts := defaultPullConsumerOptions()
+       for _, apply := range options {
+               apply(&defaultOpts)
+       }
+
+       srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+       if err != nil {
+               return nil, errors.Wrap(err, "new Namesrv failed.")
+       }
+       if !defaultOpts.Credentials.IsEmpty() {
+               srvs.SetCredentials(defaultOpts.Credentials)
+       }
+       defaultOpts.Namesrv = srvs
+
+       actualRMQClient := 
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+       actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), 
defaultOpts.Namesrv)
+
+       dc := &defaultManualPullConsumer{
+               client:  actualRMQClient,
+               option:  defaultOpts,
+               namesrv: actualNameSrv,
+       }
+
+       dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+       dc.option.ClientOptions.Namesrv = actualNameSrv
+       return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, 
groupName string, mq *primitive.MessageQueue, offset int64, numbers int) 
(*primitive.PullResult, error) {
+       if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+               return nil, err
+       }
+       subData := buildSubscriptionData(mq.Topic, MessageSelector{
+               Expression: _SubAll,
+       })
+
+       sysFlag := buildSysFlag(false, true, true, false)
+
+       pullRequest := &internal.PullMessageRequestHeader{
+               ConsumerGroup:        groupName,
+               Topic:                mq.Topic,
+               QueueId:              int32(mq.QueueId),
+               QueueOffset:          offset,
+               MaxMsgNums:           int32(numbers),
+               SysFlag:              sysFlag,
+               CommitOffset:         0,
+               SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+               SubExpression:        subData.SubString,
+               ExpressionType:       string(subData.ExpType),
+       }
+
+       if subData.ExpType == string(TAG) {
+               pullRequest.SubVersion = 0
+       } else {
+               pullRequest.SubVersion = subData.SubVersion
+       }
+
+       pullResp, err := dc.pullInner(ctx, mq, pullRequest)
+       if err != nil {
+               return pullResp, err
+       }
+       dc.processPullResult(mq, pullResp, subData)
+       if dc.interceptor != nil {
+               msgCtx := &primitive.ConsumeMessageContext{
+                       Properties:    make(map[string]string),
+                       ConsumerGroup: groupName,
+                       MQ:            mq,
+                       Msgs:          pullResp.GetMessageExts(),
+               }
+               err = dc.interceptor(ctx, msgCtx, struct{}{}, 
primitive.NoopInterceptor)
+       }
+       return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, 
topic string) ([]*primitive.MessageQueue, error) {
+       return dc.namesrv.FetchSubscribeMessageQueues(topic)
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(ctx context.Context, 
groupName string, mq *primitive.MessageQueue) (int64, error) {
+       fn := func(broker string) (*remote.RemotingCommand, error) {
+               request := &internal.QueryConsumerOffsetRequestHeader{
+                       ConsumerGroup: groupName,
+                       Topic:         mq.Topic,
+                       QueueId:       mq.QueueId,
+               }
+               cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, 
request, nil)

Review comment:
       Should use `ReqQueryConsumerOffset`

##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "log"
+       "os"
+       "sync"
+
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+       groupName := "testGroup"
+       c, err := consumer.NewManualPullConsumer(
+               consumer.WithGroupName(groupName),
+               
consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+       )
+       if err != nil {
+               log.Fatalf("init producer error: %v", err)
+       }
+
+       topic := "test"
+       // get all message queue
+       mqs, err := c.GetMessageQueues(context.Background(), topic)
+       if err != nil {
+               log.Fatalf("Get message queue error: %v", err)
+       }
+       var wg sync.WaitGroup
+
+       fn := func(mq *primitive.MessageQueue) {
+               defer wg.Done()
+               // get latest offset
+               offset, err := c.CommittedOffset(context.Background(), 
groupName, mq)
+               for {
+                       if err != nil {
+                               log.Fatalf("search latest offset error: %v", 
err)
+                       }
+                       // pull message
+                       ret, err := c.PullFromQueue(context.Background(), 
groupName, mq, offset, 1)
+                       if err != nil {
+                               log.Fatalf("pullFromQueue error: %v", err)
+                       }
+                       if ret.Status == primitive.PullFound {
+                               msgs := ret.GetMessageExts()
+                               for _, msg := range msgs {
+                                       log.Printf("subscribe Msg: %v \n", msg)
+                                       // commit offset
+                                       if err = c.Seek(context.Background(), 
groupName, mq, msg.QueueOffset+1); err != nil {

Review comment:
       Using `Seek` to commit offset seems unclear because the `Seek` method is 
commonly used express reset a read/write position semantics. Could we rename 
this method to `CommitOffset` and rename `CommittedOffset ` to 
`QueryCommitedOffset` for reduce ambiguous? This is not fully considered 
thinking, what's your think about it?

##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "log"
+       "os"
+       "sync"
+
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+       groupName := "testGroup"
+       c, err := consumer.NewManualPullConsumer(
+               consumer.WithGroupName(groupName),
+               
consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+       )
+       if err != nil {
+               log.Fatalf("init producer error: %v", err)
+       }
+
+       topic := "test"
+       // get all message queue
+       mqs, err := c.GetMessageQueues(context.Background(), topic)
+       if err != nil {
+               log.Fatalf("Get message queue error: %v", err)
+       }
+       var wg sync.WaitGroup
+
+       fn := func(mq *primitive.MessageQueue) {
+               defer wg.Done()
+               // get latest offset
+               offset, err := c.CommittedOffset(context.Background(), 
groupName, mq)
+               for {
+                       if err != nil {
+                               log.Fatalf("search latest offset error: %v", 
err)
+                       }
+                       // pull message
+                       ret, err := c.PullFromQueue(context.Background(), 
groupName, mq, offset, 1)
+                       if err != nil {
+                               log.Fatalf("pullFromQueue error: %v", err)
+                       }
+                       if ret.Status == primitive.PullFound {
+                               msgs := ret.GetMessageExts()
+                               for _, msg := range msgs {
+                                       log.Printf("subscribe Msg: %v \n", msg)
+                                       // commit offset
+                                       if err = c.Seek(context.Background(), 
groupName, mq, msg.QueueOffset+1); err != nil {
+                                               log.Fatalf("commit offset 
error: %v", err)
+                                       }
+                                       offset++
+                               }
+                       } else {
+                               break
+                       }
+               }
+       }
+
+       for _, mq := range mqs {
+               wg.Add(1)
+               go fn(mq)
+       }
+       wg.Wait()
+       c.Shutdown()
+       os.Exit(0)

Review comment:
       redundant code line, because if exit code is 0 if normal exiting

##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "log"
+       "os"
+       "sync"
+
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+       groupName := "testGroup"
+       c, err := consumer.NewManualPullConsumer(
+               consumer.WithGroupName(groupName),
+               
consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+       )
+       if err != nil {
+               log.Fatalf("init producer error: %v", err)
+       }
+
+       topic := "test"
+       // get all message queue
+       mqs, err := c.GetMessageQueues(context.Background(), topic)
+       if err != nil {
+               log.Fatalf("Get message queue error: %v", err)

Review comment:
       please use `rlog` or `fmt.Printf` instead of it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to