nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002836092
##########
pulsar/consumer_impl.go:
##########
@@ -540,17 +545,11 @@ func (c *consumer) Nack(msg Message) {
}
func (c *consumer) NackID(msgID MessageID) {
- mid, ok := c.messageID(msgID)
- if !ok {
- return
- }
-
- if mid.consumer != nil {
- mid.Nack()
+ if err := c.checkMsgIDPartition(msgID); err != nil {
Review Comment:
Same as above.
##########
pulsar/consumer_partition.go:
##########
@@ -698,8 +746,23 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
}
}
+ isChunkedMsg := false
+ if msgMeta.GetNumChunksFromMsg() > 1 {
+ isChunkedMsg = true
+ }
+
+ processedPayloadBuffer := internal.NewBufferWrapper(decryptedPayload)
Review Comment:
```suggestion
var processedPayloadBuffer internal.Buffer
```
##########
pulsar/consumer_impl.go:
##########
@@ -586,12 +585,11 @@ func (c *consumer) Seek(msgID MessageID) error {
return newError(SeekFailed, "for partition topic, seek command
should perform on the individual partitions")
}
- mid, ok := c.messageID(msgID)
- if !ok {
- return nil
+ if err := c.checkMsgIDPartition(msgID); err != nil {
Review Comment:
Same as above.
##########
pulsar/consumer_impl.go:
##########
@@ -456,20 +466,15 @@ func (c *consumer) Ack(msg Message) error {
// AckID the consumption of a single message, identified by its MessageID
func (c *consumer) AckID(msgID MessageID) error {
- mid, ok := c.messageID(msgID)
- if !ok {
- return errors.New("failed to convert trackingMessageID")
- }
-
- if mid.consumer != nil {
- return mid.Ack()
+ if err := c.checkMsgIDPartition(msgID); err != nil {
Review Comment:
Missed covert the `msgID` from the `MessageID` to the `trackingMessageID`
type, I'm not sure if we need this.
Why not use `messageID()`, what did I miss?
##########
pulsar/producer_partition.go:
##########
@@ -299,6 +278,26 @@ func (p *partitionProducer) grabCnx() error {
if err != nil {
return err
}
+
+ if p.options.DisableBatching {
Review Comment:
The following code looks like more clear:
```
if !p.options.DisableBatching {
batcherBuilderType := p.options.BatcherBuilderType
provider, err := GetBatcherBuilderProvider(batcherBuilderType)
if err != nil {
return err
}
maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
p.batchBuilder, err = provider(p.options.BatchingMaxMessages,
p.options.BatchingMaxSize,
maxMessageSize, p.producerName, p.producerID,
pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
p,
p.log,
p.encryptor)
if err != nil {
return err
}
}
```
##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
return true
}
msgSize := uint32(len(payload))
- return bc.numMessages+1 <= bc.maxMessages &&
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+ expectedSize := bc.buffer.ReadableBytes() + msgSize
Review Comment:
Please revert this code(170-174).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]