merlimat commented on a change in pull request #86: Simplify and refactor parts 
of  the single topic consumer.
URL: https://github.com/apache/pulsar-client-go/pull/86#discussion_r344857288
 
 

 ##########
 File path: pulsar/consumer_partition.go
 ##########
 @@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "fmt"
+       "math"
+       "time"
+
+       "github.com/golang/protobuf/proto"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pkg/pb"
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+)
+
+type consumerState int
+
+const (
+       consumerInit consumerState = iota
+       consumerReady
+       consumerClosing
+       consumerClosed
+)
+
+type partitionConsumerOpts struct {
+       topic               string
+       subscription        string
+       subscriptionType    SubscriptionType
+       subscriptionInitPos SubscriptionInitialPosition
+       partitionIdx        int
+       receiverQueueSize   int
+}
+
+type partitionConsumer struct {
+       client *client
+
+       // this is needed for sending ConsumerMessage on the messageCh
+       parentConsumer Consumer
+       state          consumerState
+       options        *partitionConsumerOpts
+
+       conn internal.Connection
+
+       topic        string
+       name         string
+       consumerID   uint64
+       partitionIdx int
+
+       // shared channel
+       messageCh chan ConsumerMessage
+
+       // the number of message slots available
+       availablePermits int32
+
+       // the size of the queue channel for buffering messages
+       queueSize int32
+       queueCh   chan []*message
+
+       eventsCh    chan interface{}
+       connectedCh chan struct{}
+       closeCh     chan struct{}
+
+       log *log.Entry
+}
+
+func newPartitionConsumer(parent Consumer, client *client, options 
*partitionConsumerOpts,
+       messageCh chan ConsumerMessage) (*partitionConsumer, error) {
+       pc := &partitionConsumer{
+               state:          consumerInit,
+               parentConsumer: parent,
+               client:         client,
+               options:        options,
+               topic:          options.topic,
+               consumerID:     client.rpcClient.NewConsumerID(),
+               partitionIdx:   options.partitionIdx,
+               eventsCh:       make(chan interface{}, 3),
+               queueSize:      int32(options.receiverQueueSize),
+               queueCh:        make(chan []*message, 
options.receiverQueueSize),
+               connectedCh:    make(chan struct{}),
+               messageCh:      messageCh,
+               closeCh:        make(chan struct{}),
+               log:            log.WithField("topic", options.topic),
 
 Review comment:
   We could also set more fields on the logger, like the subscription name and 
consumer name

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to