shibd commented on code in PR #1340:
URL: https://github.com/apache/pulsar-client-go/pull/1340#discussion_r1986205947


##########
pulsar/consumer_partition.go:
##########
@@ -2256,12 +2306,14 @@ func convertToMessageID(id *pb.MessageIdData) 
*trackingMessageID {
 
        msgID := &trackingMessageID{
                messageID: &messageID{
-                       ledgerID: int64(*id.LedgerId),
-                       entryID:  int64(*id.EntryId),
+                       ledgerID:  int64(id.GetLedgerId()),
+                       entryID:   int64(id.GetEntryId()),
+                       batchIdx:  id.GetBatchIndex(),
+                       batchSize: id.GetBatchSize(),
                },
        }
-       if id.BatchIndex != nil {
-               msgID.batchIdx = *id.BatchIndex
+       if msgID.batchIdx == -1 {
+               msgID.batchIdx = 0

Review Comment:
   In the Go Client, it's not like this. You can run the following code to test:
   
   Need change msg id String method first.
   ``` go
   func (id *messageID) String() string {
        return fmt.Sprintf("%d:%d:%d:%d", id.ledgerID, id.entryID, id.batchIdx, 
id.partitionIdx)
   }
   ```
   
   
   ``` go
   func TestMessageID_BatchIdx(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
   
        assert.Nil(t, err)
        defer client.Close()
   
        topic := newTopicName()
        ctx := context.Background()
   
        // create producer
        producer, err := client.CreateProducer(ProducerOptions{
                Topic:           topic,
                DisableBatching: true,
        })
        assert.Nil(t, err)
        defer producer.Close()
   
        // create consumer
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:             topic,
                SubscriptionName:  "my-sub",
                Type:              Exclusive,
                ReceiverQueueSize: 10,
        })
        assert.Nil(t, err)
        defer consumer.Close()
   
        // send 10 messages with no batch
        var lastMsgID MessageID
        for i := 0; i < 10; i++ {
                lastMsgID, err = producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                })
                t.Logf("Sended message msgId: %#v \n", lastMsgID.String())
                assert.NoError(t, err)
                assert.NotNil(t, lastMsgID)
        }
   
        // Receive 10 message with no batch
        for i := 0; i < 10; i++ {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
                t.Logf("Received message msgId: %#v \n", msg.ID().String())
                consumer.Ack(msg)
        }
   }
   
   output:
      consumer_test.go:572: Sended message msgId: "3141:0:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:1:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:2:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:3:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:4:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:5:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:6:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:7:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:8:0:0" 
       consumer_test.go:572: Sended message msgId: "3141:9:0:0" 
       consumer_test.go:581: Received message msgId: "3141:0:0:0" 
       consumer_test.go:581: Received message msgId: "3141:1:0:0" 
       consumer_test.go:581: Received message msgId: "3141:2:0:0" 
       consumer_test.go:581: Received message msgId: "3141:3:0:0" 
       consumer_test.go:581: Received message msgId: "3141:4:0:0" 
       consumer_test.go:581: Received message msgId: "3141:5:0:0" 
       consumer_test.go:581: Received message msgId: "3141:6:0:0" 
       consumer_test.go:581: Received message msgId: "3141:7:0:0" 
       consumer_test.go:581: Received message msgId: "3141:8:0:0" 
       consumer_test.go:581: Received message msgId: "3141:9:0:0" 
   ```
   
   In Java, it will be a different implementation class, so there's no 
batchIndex field when disabled batch.
   
   If we want to keep it consistent, we can change it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to