nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002836092


##########
pulsar/consumer_impl.go:
##########
@@ -540,17 +545,11 @@ func (c *consumer) Nack(msg Message) {
 }
 
 func (c *consumer) NackID(msgID MessageID) {
-       mid, ok := c.messageID(msgID)
-       if !ok {
-               return
-       }
-
-       if mid.consumer != nil {
-               mid.Nack()
+       if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Same as above.



##########
pulsar/consumer_partition.go:
##########
@@ -698,8 +746,23 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                }
        }
 
+       isChunkedMsg := false
+       if msgMeta.GetNumChunksFromMsg() > 1 {
+               isChunkedMsg = true
+       }
+
+       processedPayloadBuffer := internal.NewBufferWrapper(decryptedPayload)

Review Comment:
   ```suggestion
        var processedPayloadBuffer internal.Buffer
   ```



##########
pulsar/consumer_impl.go:
##########
@@ -586,12 +585,11 @@ func (c *consumer) Seek(msgID MessageID) error {
                return newError(SeekFailed, "for partition topic, seek command 
should perform on the individual partitions")
        }
 
-       mid, ok := c.messageID(msgID)
-       if !ok {
-               return nil
+       if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Same as above.



##########
pulsar/consumer_impl.go:
##########
@@ -456,20 +466,15 @@ func (c *consumer) Ack(msg Message) error {
 
 // AckID the consumption of a single message, identified by its MessageID
 func (c *consumer) AckID(msgID MessageID) error {
-       mid, ok := c.messageID(msgID)
-       if !ok {
-               return errors.New("failed to convert trackingMessageID")
-       }
-
-       if mid.consumer != nil {
-               return mid.Ack()
+       if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Missed covert the `msgID` from the `MessageID` to the `trackingMessageID` 
type, I'm not sure if we need this.
   
   Why not use `messageID()`, what did I miss?
   
   



##########
pulsar/producer_partition.go:
##########
@@ -299,6 +278,26 @@ func (p *partitionProducer) grabCnx() error {
        if err != nil {
                return err
        }
+
+       if p.options.DisableBatching {

Review Comment:
   The following code looks like more clear:
   ```
        if !p.options.DisableBatching {
                batcherBuilderType := p.options.BatcherBuilderType
                provider, err := GetBatcherBuilderProvider(batcherBuilderType)
                if err != nil {
                        return err
                }
                maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
                p.batchBuilder, err = provider(p.options.BatchingMaxMessages, 
p.options.BatchingMaxSize,
                        maxMessageSize, p.producerName, p.producerID, 
pb.CompressionType(p.options.CompressionType),
                        compression.Level(p.options.CompressionLevel),
                        p,
                        p.log,
                        p.encryptor)
                if err != nil {
                        return err
                }
        }
   ```



##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
                return true
        }
        msgSize := uint32(len(payload))
-       return bc.numMessages+1 <= bc.maxMessages && 
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+       expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   Please revert this code(170-174).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to