cckellogg commented on a change in pull request #705:
URL: https://github.com/apache/pulsar-client-go/pull/705#discussion_r788953483



##########
File path: pulsar/consumer.go
##########
@@ -182,6 +182,15 @@ type ConsumerOptions struct {
        // > Notice: the NackBackoffPolicy will not work with 
`consumer.NackID(MessageID)`
        // > because we are not able to get the redeliveryCount from the 
message ID.
        NackBackoffPolicy NackBackoffPolicy
+
+       // startMessageID internally used by multitopic-reader
+       startMessageID MessageID

Review comment:
       The ConsumerOptions is a public interface so I don't think we should add 
package private fields here. Let's remove these.

##########
File path: pulsar/consumer_partition.go
##########
@@ -1265,6 +1265,10 @@ func (pc *partitionConsumer) _getConn() 
internal.Connection {
        return pc.conn.Load().(internal.Connection)
 }
 
+func (pc *partitionConsumer) messagesInQueue() int {
+       return len(pc.queueCh)

Review comment:
       If there are multi go routines consuming and producing to this channel 
this value might not be useful after being read.

##########
File path: pulsar/consumer.go
##########
@@ -248,4 +257,13 @@ type Consumer interface {
 
        // Name returns the name of consumer.
        Name() string
+
+       // lastDequeuedMsg used for setting last dequeued msg id by internal 
partition consumers

Review comment:
       I don't think we should be adding private methods to the public 
interface. I think these should be removed. Also, it seems these are only 
relevant to the reader implementation so I think these helper functions should 
live within the reader files.

##########
File path: pulsar/client_impl.go
##########
@@ -168,12 +168,26 @@ func (c *client) Subscribe(options ConsumerOptions) 
(Consumer, error) {
 }
 
 func (c *client) CreateReader(options ReaderOptions) (Reader, error) {
-       reader, err := newReader(c, options)
+       topics, err := c.TopicPartitions(options.Topic)
        if err != nil {
                return nil, err
        }
-       c.handlers.Add(reader)
-       return reader, nil
+       if len(topics) <= 1 {
+               reader, err := newReader(c, options)

Review comment:
       What's the different between a reader and a multi reader? It seems like 
they should be the same but just consume a different number of partitions?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to