RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r980988431


##########
pulsar/consumer_impl.go:
##########
@@ -91,6 +90,15 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                }
        }
 
+       if options.MaxPendingChunkedMessage == 0 {
+               options.MaxPendingChunkedMessage = 100
+       }
+
+       // the minimum timer interval is 100ms

Review Comment:
   Seems the java client doesn't introduce the minimum value for this 
parameter. It will introduce inconsistent behavior here. Even if we need to 
introduce a minimum value, we should throw an error instead of changing the 
value set by the user, otherwise it will make the user confused.
   
   I think we need to implement it in the java client first. And this can be 
done in another PR.



##########
pulsar/producer_partition.go:
##########
@@ -1050,3 +1270,41 @@ func (p *partitionProducer) _getConn() 
internal.Connection {
        //            invariant is broken
        return p.conn.Load().(internal.Connection)
 }
+
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
+       if p.options.DisableBlockIfQueueFull {
+               if !p.publishSemaphore.TryAcquire() {
+                       if sr.callback != nil {
+                               sr.callback(nil, sr.msg, errSendQueueIsFull)
+                       }
+                       return false
+               }
+       } else if !p.publishSemaphore.Acquire(sr.ctx) {
+               sr.callback(nil, sr.msg, errContextExpired)
+               sr.blockCh <- struct{}{}
+               return false
+       } else if sr.totalChunks == 0 || sr.totalChunks == 1 || (sr.totalChunks 
> 1 && sr.chunkID == sr.totalChunks-1) {

Review Comment:
   Could you move this logic out of `canAddToQueue`? I think it's better to 
move it to `internalSend`.
   And I think totalChunks should not be possible to be `0`.



##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) 
trackingMessageID {
 
        return msgID
 }
+
+type chunkedMsgCtx struct {
+       totalChunks      int32
+       chunkedMsgBuffer internal.Buffer
+       lastChunkedMsgID int32
+       chunkedMsgIDs    []messageID
+       receivedTime     int64
+
+       mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) 
*chunkedMsgCtx {
+       return &chunkedMsgCtx{
+               totalChunks:      numChunksFromMsg,
+               chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+               lastChunkedMsgID: -1,
+               chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+               receivedTime:     time.Now().Unix(),
+       }
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload 
internal.Buffer) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.chunkedMsgIDs[chunkID] = msgID
+       c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+       c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if len(c.chunkedMsgIDs) == 0 {
+               return messageID{}
+       }
+       return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if len(c.chunkedMsgIDs) == 0 {
+               return messageID{}
+       }
+       return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       for _, mid := range c.chunkedMsgIDs {
+               pc.log.Info("Removing chunk message-id", mid.String())
+               tmid, _ := toTrackingMessageID(mid)
+               pc.AckID(tmid)
+       }
+}
+
+type chunkedMsgCtxMap struct {
+       chunkedMsgCtxs map[string]*chunkedMsgCtx
+       pendingQueue   *list.List
+       maxPending     int
+       pc             *partitionConsumer
+       mu             sync.Mutex
+       closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) 
*chunkedMsgCtxMap {
+       return &chunkedMsgCtxMap{
+               chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+               pendingQueue:   list.New(),
+               maxPending:     maxPending,
+               pc:             pc,
+               mu:             sync.Mutex{},
+       }
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, 
totalChunkMsgSize int) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.closed {
+               return
+       }
+       if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+               c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, 
totalChunkMsgSize)
+               c.pendingQueue.PushBack(uuid)
+               go c.removeChunkIfExpire(uuid, true, 
c.pc.options.expireTimeOfIncompleteChunk)
+       }
+       if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+               go c.removeChunkMessage(uuid, 
c.pc.options.autoAckIncompleteChunk)
+       }
+}
+
+func (c *chunkedMsgCtxMap) get(uuid string) *chunkedMsgCtx {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.closed {
+               return nil
+       }
+       return c.chunkedMsgCtxs[uuid]
+}
+
+func (c *chunkedMsgCtxMap) remove(uuid string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.closed {
+               return
+       }
+       delete(c.chunkedMsgCtxs, uuid)
+       e := c.pendingQueue.Front()
+       for ; e != nil; e = e.Next() {
+               if e.Value.(string) == uuid {
+                       c.pendingQueue.Remove(e)
+                       break
+               }
+       }
+}
+
+func (c *chunkedMsgCtxMap) removeChunkMessage(uuid string, autoAck bool) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.closed {
+               return
+       }
+       ctx, ok := c.chunkedMsgCtxs[uuid]
+       if !ok {
+               return
+       }
+       if autoAck {
+               ctx.discard(c.pc)
+       }
+       delete(c.chunkedMsgCtxs, uuid)
+       e := c.pendingQueue.Front()
+       for ; e != nil; e = e.Next() {
+               if e.Value.(string) == uuid {
+                       c.pendingQueue.Remove(e)
+                       break
+               }
+       }
+       c.pc.log.Infof("Chunked message [%s] has been removed from 
chunkedMsgCtxMap", uuid)
+}
+
+func (c *chunkedMsgCtxMap) removeChunkIfExpire(uuid string, autoAck bool, 
expire time.Duration) {
+       timer := time.NewTimer(expire)
+       <-timer.C
+       c.removeChunkMessage(uuid, autoAck)
+}
+
+func (c *chunkedMsgCtxMap) Close() {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.closed = true
+}
+
+type unAckChunksTracker struct {
+       chunkIDs map[chunkMessageID][]messageID
+       pc       *partitionConsumer
+       mu       sync.Mutex
+}
+
+func newUnAckChunksTracker(pc *partitionConsumer) *unAckChunksTracker {
+       return &unAckChunksTracker{
+               chunkIDs: make(map[chunkMessageID][]messageID),
+               pc:       pc,
+       }
+}
+
+func (u *unAckChunksTracker) add(cmid chunkMessageID, ids []messageID) {
+       u.mu.Lock()
+       defer u.mu.Unlock()
+
+       u.chunkIDs[cmid] = ids
+}
+
+func (u *unAckChunksTracker) get(cmid chunkMessageID) []messageID {
+       u.mu.Lock()
+       defer u.mu.Unlock()
+
+       return u.chunkIDs[cmid]
+}
+
+func (u *unAckChunksTracker) remove(cmid chunkMessageID) {
+       u.mu.Lock()
+       defer u.mu.Unlock()
+
+       delete(u.chunkIDs, cmid)
+}
+
+func (u *unAckChunksTracker) ack(cmid chunkMessageID) error {
+       ids := u.get(cmid)
+       for _, id := range ids {
+               if err := u.pc.AckID(id); err != nil {
+                       return err
+               }
+       }
+       u.remove(cmid)
+       return nil
+}
+
+func (u *unAckChunksTracker) nack(cmid chunkMessageID) {
+       ids := u.get(cmid)
+       for _, id := range ids {
+               u.pc.nackTracker.Add(id)
+               u.pc.metrics.NacksCounter.Inc()

Review Comment:
   I think it's better to call `u.pc.NackID` here. Otherwise, for these codes, 
we need to maintain in two places.



##########
pulsar/consumer_partition.go:
##########
@@ -802,6 +886,55 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        return nil
 }
 
+func (pc *partitionConsumer) processMessageChunk(compressedPayload 
internal.Buffer,
+       msgMeta *pb.MessageMetadata,
+       pbMsgID *pb.MessageIdData) internal.Buffer {
+       uuid := msgMeta.GetUuid()
+       numChunks := msgMeta.GetNumChunksFromMsg()
+       totalChunksSize := int(msgMeta.GetTotalChunkMsgSize())
+       chunkID := msgMeta.GetChunkId()
+       msgID := messageID{
+               ledgerID:     int64(pbMsgID.GetLedgerId()),
+               entryID:      int64(pbMsgID.GetEntryId()),
+               batchIdx:     -1,
+               partitionIdx: pc.partitionIdx,
+       }
+
+       if msgMeta.GetChunkId() == 0 {
+               pc.chunkedMsgCtxMap.addIfAbsent(uuid,
+                       numChunks,
+                       totalChunksSize,
+               )
+       }
+
+       ctx := pc.chunkedMsgCtxMap.get(uuid)
+
+       if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != 
ctx.lastChunkedMsgID+1 {
+               lastChunkedMsgID := -1
+               totalChunks := -1
+               if ctx != nil {
+                       lastChunkedMsgID = int(ctx.lastChunkedMsgID)
+                       totalChunks = int(ctx.totalChunks)
+                       ctx.chunkedMsgBuffer.Clear()
+               }
+               pc.log.Warnf(fmt.Sprintf(
+                       "Received unexpected chunk messageId %s, last-chunk-id 
%d, chunkId = %d, total-chunks %d",
+                       msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
+               pc.chunkedMsgCtxMap.remove(uuid)
+               atomic.AddInt32(&pc.availablePermits, 1)
+               return nil
+       }
+
+       ctx.refresh(chunkID, msgID, compressedPayload)

Review Comment:
   It's better to rename it to `ctx.apeend`.



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "math/rand"
+       "strings"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           newTopicName(),
+               DisableBatching: false,
+               EnableChunking:  true,
+       })
+
+       assert.Error(t, err, "producer creation should have fail")
+       assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create producer without ChunkMaxMessageSize
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+               EnableChunking:  true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer1)
+       defer producer1.Close()
+
+       // create producer with ChunkMaxMessageSize
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     true,
+               EnableChunking:      true,
+               ChunkMaxMessageSize: 5,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer2)
+       defer producer2.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: "chunk-subscriber",
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       expectMsgs := make([][]byte, 0, 10)
+
+       // test send chunk with serverMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+               expectMsgs = append(expectMsgs, msg)
+               ID, err := producer1.Send(context.Background(), 
&ProducerMessage{
+                       Payload: msg,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // test receive chunk with serverMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.NoError(t, err)
+
+               expectMsg := expectMsgs[i]
+
+               assert.Equal(t, expectMsg, msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+
+       // test send chunk with ChunkMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               msg := createTestMessagePayload(50)
+               expectMsgs = append(expectMsgs, msg)
+               ID, err := producer2.Send(context.Background(), 
&ProducerMessage{
+                       Payload: msg,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // test receive chunk with ChunkMaxMessageSize limit
+       for i := 5; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               assert.NoError(t, err)
+
+               expectMsg := expectMsgs[i]
+
+               assert.Equal(t, expectMsg, msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create producer without ChunkMaxMessageSize
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload: createTestMessagePayload(_brokerMaxMessageSize + 1),
+       })
+       assert.Error(t, err)
+       assert.Nil(t, ID)
+}
+
+func TestMaxPendingChunkMessages(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       totalProducers := 5
+       producers := make([]Producer, 0, 20)
+       defer func() {
+               for _, p := range producers {
+                       p.Close()
+               }
+       }()
+
+       clients := make([]Client, 0, 20)
+       defer func() {
+               for _, c := range clients {
+                       c.Close()
+               }
+       }()
+
+       for j := 0; j < totalProducers; j++ {
+               pc, err := NewClient(ClientOptions{
+                       URL: lookupURL,
+               })
+               assert.Nil(t, err)
+               clients = append(clients, pc)
+               producer, err := pc.CreateProducer(ProducerOptions{
+                       Topic:               topic,
+                       DisableBatching:     true,
+                       EnableChunking:      true,
+                       ChunkMaxMessageSize: 10,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, producer)
+               producers = append(producers, producer)
+       }
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                    topic,
+               Type:                     Exclusive,
+               SubscriptionName:         "chunk-subscriber",
+               MaxPendingChunkedMessage: 1,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       totalMsgs := 40
+       wg := sync.WaitGroup{}
+       wg.Add(totalMsgs * totalProducers)
+       for i := 0; i < totalMsgs; i++ {
+               for j := 0; j < totalProducers; j++ {
+                       p := producers[j]
+                       go func() {
+                               ID, err := p.Send(context.Background(), 
&ProducerMessage{
+                                       Payload: createTestMessagePayload(50),
+                               })
+                               assert.NoError(t, err)
+                               assert.NotNil(t, ID)
+                               wg.Done()
+                       }()
+               }
+       }
+       wg.Wait()
+
+       received := 0
+       for i := 0; i < totalMsgs*totalProducers; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*10)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               if msg == nil || (err != nil && errors.Is(err, 
context.DeadlineExceeded)) {
+                       break
+               }
+
+               received++
+
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+
+       assert.NotEqual(t, totalMsgs*totalProducers, received)
+}
+
+func TestExpireIncompleteChunks(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       c, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               Type:                        Exclusive,
+               SubscriptionName:            "chunk-subscriber",
+               ExpireTimeOfIncompleteChunk: time.Millisecond * 300,
+       })
+       assert.NoError(t, err)
+       defer c.Close()
+
+       uuid := "test-uuid"
+       chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap
+       chunkCtxMap.addIfAbsent(uuid, 2, 100)
+       ctx := chunkCtxMap.get(uuid)
+       assert.NotNil(t, ctx)
+
+       time.Sleep(400 * time.Millisecond)
+
+       ctx = chunkCtxMap.get(uuid)
+       assert.Nil(t, ctx)
+}
+
+func TestChunksEnqueueFailed(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:                   topic,
+               EnableChunking:          true,
+               DisableBatching:         true,
+               MaxPendingMessages:      10,
+               ChunkMaxMessageSize:     50,
+               DisableBlockIfQueueFull: true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload: createTestMessagePayload(1000),
+       })
+       assert.Error(t, err)
+       assert.Nil(t, ID)
+}
+
+func TestSeekChunkMessages(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       totalMessages := 5
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               EnableChunking:      true,
+               DisableBatching:     true,
+               ChunkMaxMessageSize: 50,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: "default-seek",
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       msgIDs := make([]MessageID, 0)
+       for i := 0; i < totalMessages; i++ {
+               ID, err := producer.Send(context.Background(), &ProducerMessage{
+                       Payload: createTestMessagePayload(100),
+               })
+               assert.NoError(t, err)
+               msgIDs = append(msgIDs, ID)
+       }
+
+       for i := 0; i < totalMessages; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.NoError(t, err)
+               assert.NotNil(t, msg)
+               assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+       }
+
+       err = consumer.Seek(msgIDs[1])
+       assert.NoError(t, err)
+
+       for i := 1; i < totalMessages; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.NoError(t, err)
+               assert.NotNil(t, msg)
+               assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+       }

Review Comment:
   Seems that there is no startMessageIdInclusive option in the consumer option 
currently. But it exists in the reader option. In this test, we should create 
two consumer(with startMessageIdInclusive true and false), if the 
startMessageIdInclusive is true, then we can receive msgIDs[1] after the seek 
operation. Otherwise, we should receive msgIDs[2]. 
   
   The default value of startMessageIdInclusive is false, so the default 
behavior of the consumer is to receive msgIDs[2] after the seek operation. We 
need to add this option to the consumer. But if it's too complex to add to this 
PR, we can add it to a separate PR.



##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) 
trackingMessageID {
 
        return msgID
 }
+
+type chunkedMsgCtx struct {
+       totalChunks      int32
+       chunkedMsgBuffer internal.Buffer
+       lastChunkedMsgID int32
+       chunkedMsgIDs    []messageID
+       receivedTime     int64
+
+       mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) 
*chunkedMsgCtx {
+       return &chunkedMsgCtx{
+               totalChunks:      numChunksFromMsg,
+               chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+               lastChunkedMsgID: -1,
+               chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+               receivedTime:     time.Now().Unix(),
+       }
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload 
internal.Buffer) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.chunkedMsgIDs[chunkID] = msgID
+       c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+       c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if len(c.chunkedMsgIDs) == 0 {
+               return messageID{}
+       }
+       return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if len(c.chunkedMsgIDs) == 0 {
+               return messageID{}
+       }
+       return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       for _, mid := range c.chunkedMsgIDs {
+               pc.log.Info("Removing chunk message-id", mid.String())
+               tmid, _ := toTrackingMessageID(mid)
+               pc.AckID(tmid)
+       }
+}
+
+type chunkedMsgCtxMap struct {
+       chunkedMsgCtxs map[string]*chunkedMsgCtx
+       pendingQueue   *list.List
+       maxPending     int
+       pc             *partitionConsumer
+       mu             sync.Mutex
+       closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) 
*chunkedMsgCtxMap {
+       return &chunkedMsgCtxMap{
+               chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+               pendingQueue:   list.New(),
+               maxPending:     maxPending,
+               pc:             pc,
+               mu:             sync.Mutex{},
+       }
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, 
totalChunkMsgSize int) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.closed {
+               return
+       }
+       if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+               c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, 
totalChunkMsgSize)
+               c.pendingQueue.PushBack(uuid)
+               go c.removeChunkIfExpire(uuid, true, 
c.pc.options.expireTimeOfIncompleteChunk)
+       }
+       if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+               go c.removeChunkMessage(uuid, 
c.pc.options.autoAckIncompleteChunk)
+       }
+}
+
+func (c *chunkedMsgCtxMap) get(uuid string) *chunkedMsgCtx {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.closed {
+               return nil
+       }
+       return c.chunkedMsgCtxs[uuid]
+}
+
+func (c *chunkedMsgCtxMap) remove(uuid string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.closed {
+               return
+       }
+       delete(c.chunkedMsgCtxs, uuid)
+       e := c.pendingQueue.Front()
+       for ; e != nil; e = e.Next() {
+               if e.Value.(string) == uuid {
+                       c.pendingQueue.Remove(e)
+                       break
+               }
+       }
+}
+
+func (c *chunkedMsgCtxMap) removeChunkMessage(uuid string, autoAck bool) {

Review Comment:
   I think we can rename this method to `discardChunkedMessage`. Otherwise, it 
will confuse others because there are two `remove` methods.



##########
pulsar/impl_message.go:
##########
@@ -372,3 +386,28 @@ func (t *ackTracker) completed() bool {
        defer t.Unlock()
        return len(t.batchIDs.Bits()) == 0
 }
+
+type chunkMessageID struct {
+       messageID
+
+       firstChunkID messageID
+       receivedTime time.Time
+
+       consumer acker
+}
+
+func newChunkMessageID(firstChunkID messageID, lastChunkID messageID) 
chunkMessageID {
+       return chunkMessageID{
+               messageID:    lastChunkID,
+               firstChunkID: firstChunkID,
+               receivedTime: time.Now(),
+       }
+}
+
+func (id chunkMessageID) String() string {
+       return fmt.Sprintf("%s;%s", id.firstChunkID.String(), 
id.messageID.String())
+}
+
+func (id chunkMessageID) Serialize() []byte {
+       return id.firstChunkID.Serialize()

Review Comment:
   Should use `MessageIdData` to serialize the chunk message id.



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "math/rand"
+       "strings"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           newTopicName(),
+               DisableBatching: false,
+               EnableChunking:  true,
+       })
+
+       assert.Error(t, err, "producer creation should have fail")
+       assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create producer without ChunkMaxMessageSize
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+               EnableChunking:  true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer1)
+       defer producer1.Close()
+
+       // create producer with ChunkMaxMessageSize
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     true,
+               EnableChunking:      true,
+               ChunkMaxMessageSize: 5,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer2)
+       defer producer2.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: "chunk-subscriber",
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       expectMsgs := make([][]byte, 0, 10)
+
+       // test send chunk with serverMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+               expectMsgs = append(expectMsgs, msg)
+               ID, err := producer1.Send(context.Background(), 
&ProducerMessage{
+                       Payload: msg,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // test receive chunk with serverMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.NoError(t, err)
+
+               expectMsg := expectMsgs[i]
+
+               assert.Equal(t, expectMsg, msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+
+       // test send chunk with ChunkMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               msg := createTestMessagePayload(50)
+               expectMsgs = append(expectMsgs, msg)
+               ID, err := producer2.Send(context.Background(), 
&ProducerMessage{
+                       Payload: msg,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // test receive chunk with ChunkMaxMessageSize limit
+       for i := 5; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               assert.NoError(t, err)
+
+               expectMsg := expectMsgs[i]
+
+               assert.Equal(t, expectMsg, msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {

Review Comment:
   I think we can remove this test because it has been covered in other tests.  
Regarding the `testPublishWithFailure` in the java client,  it's hard to 
simulate `stopBroker` here. I think it's no need to add that test here.



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "math/rand"
+       "strings"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           newTopicName(),
+               DisableBatching: false,
+               EnableChunking:  true,
+       })
+
+       assert.Error(t, err, "producer creation should have fail")
+       assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create producer without ChunkMaxMessageSize
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+               EnableChunking:  true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer1)
+       defer producer1.Close()
+
+       // create producer with ChunkMaxMessageSize
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     true,
+               EnableChunking:      true,
+               ChunkMaxMessageSize: 5,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer2)
+       defer producer2.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: "chunk-subscriber",
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       expectMsgs := make([][]byte, 0, 10)
+
+       // test send chunk with serverMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+               expectMsgs = append(expectMsgs, msg)
+               ID, err := producer1.Send(context.Background(), 
&ProducerMessage{
+                       Payload: msg,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // test receive chunk with serverMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.NoError(t, err)
+
+               expectMsg := expectMsgs[i]
+
+               assert.Equal(t, expectMsg, msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+
+       // test send chunk with ChunkMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               msg := createTestMessagePayload(50)
+               expectMsgs = append(expectMsgs, msg)
+               ID, err := producer2.Send(context.Background(), 
&ProducerMessage{
+                       Payload: msg,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // test receive chunk with ChunkMaxMessageSize limit
+       for i := 5; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               assert.NoError(t, err)
+
+               expectMsg := expectMsgs[i]
+
+               assert.Equal(t, expectMsg, msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {

Review Comment:
   ```suggestion
   func TestPublishLargeMessagesFailedWithoutChunking(t *testing.T) {
   ```



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "math/rand"
+       "strings"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           newTopicName(),
+               DisableBatching: false,
+               EnableChunking:  true,
+       })
+
+       assert.Error(t, err, "producer creation should have fail")
+       assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create producer without ChunkMaxMessageSize
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+               EnableChunking:  true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer1)
+       defer producer1.Close()
+
+       // create producer with ChunkMaxMessageSize
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     true,
+               EnableChunking:      true,
+               ChunkMaxMessageSize: 5,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer2)
+       defer producer2.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: "chunk-subscriber",
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       expectMsgs := make([][]byte, 0, 10)
+
+       // test send chunk with serverMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+               expectMsgs = append(expectMsgs, msg)
+               ID, err := producer1.Send(context.Background(), 
&ProducerMessage{
+                       Payload: msg,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // test receive chunk with serverMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.NoError(t, err)
+
+               expectMsg := expectMsgs[i]
+
+               assert.Equal(t, expectMsg, msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+
+       // test send chunk with ChunkMaxMessageSize limit
+       for i := 0; i < 5; i++ {
+               msg := createTestMessagePayload(50)
+               expectMsgs = append(expectMsgs, msg)
+               ID, err := producer2.Send(context.Background(), 
&ProducerMessage{
+                       Payload: msg,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // test receive chunk with ChunkMaxMessageSize limit
+       for i := 5; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               assert.NoError(t, err)
+
+               expectMsg := expectMsgs[i]
+
+               assert.Equal(t, expectMsg, msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create producer without ChunkMaxMessageSize
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload: createTestMessagePayload(_brokerMaxMessageSize + 1),
+       })
+       assert.Error(t, err)
+       assert.Nil(t, ID)
+}
+
+func TestMaxPendingChunkMessages(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       totalProducers := 5
+       producers := make([]Producer, 0, 20)
+       defer func() {
+               for _, p := range producers {
+                       p.Close()
+               }
+       }()
+
+       clients := make([]Client, 0, 20)
+       defer func() {
+               for _, c := range clients {
+                       c.Close()
+               }
+       }()
+
+       for j := 0; j < totalProducers; j++ {
+               pc, err := NewClient(ClientOptions{
+                       URL: lookupURL,
+               })
+               assert.Nil(t, err)
+               clients = append(clients, pc)
+               producer, err := pc.CreateProducer(ProducerOptions{
+                       Topic:               topic,
+                       DisableBatching:     true,
+                       EnableChunking:      true,
+                       ChunkMaxMessageSize: 10,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, producer)
+               producers = append(producers, producer)
+       }
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                    topic,
+               Type:                     Exclusive,
+               SubscriptionName:         "chunk-subscriber",
+               MaxPendingChunkedMessage: 1,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       totalMsgs := 40
+       wg := sync.WaitGroup{}
+       wg.Add(totalMsgs * totalProducers)
+       for i := 0; i < totalMsgs; i++ {
+               for j := 0; j < totalProducers; j++ {
+                       p := producers[j]
+                       go func() {
+                               ID, err := p.Send(context.Background(), 
&ProducerMessage{
+                                       Payload: createTestMessagePayload(50),
+                               })
+                               assert.NoError(t, err)
+                               assert.NotNil(t, ID)
+                               wg.Done()
+                       }()
+               }
+       }
+       wg.Wait()
+
+       received := 0
+       for i := 0; i < totalMsgs*totalProducers; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*10)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               if msg == nil || (err != nil && errors.Is(err, 
context.DeadlineExceeded)) {
+                       break
+               }
+
+               received++
+
+               err = consumer.Ack(msg)
+               assert.NoError(t, err)
+       }
+
+       assert.NotEqual(t, totalMsgs*totalProducers, received)
+}
+
+func TestExpireIncompleteChunks(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       c, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               Type:                        Exclusive,
+               SubscriptionName:            "chunk-subscriber",
+               ExpireTimeOfIncompleteChunk: time.Millisecond * 300,
+       })
+       assert.NoError(t, err)
+       defer c.Close()
+
+       uuid := "test-uuid"
+       chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap
+       chunkCtxMap.addIfAbsent(uuid, 2, 100)
+       ctx := chunkCtxMap.get(uuid)
+       assert.NotNil(t, ctx)
+
+       time.Sleep(400 * time.Millisecond)
+
+       ctx = chunkCtxMap.get(uuid)
+       assert.Nil(t, ctx)
+}
+
+func TestChunksEnqueueFailed(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:                   topic,
+               EnableChunking:          true,
+               DisableBatching:         true,
+               MaxPendingMessages:      10,
+               ChunkMaxMessageSize:     50,
+               DisableBlockIfQueueFull: true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload: createTestMessagePayload(1000),
+       })
+       assert.Error(t, err)
+       assert.Nil(t, ID)
+}
+
+func TestSeekChunkMessages(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       totalMessages := 5
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               EnableChunking:      true,
+               DisableBatching:     true,
+               ChunkMaxMessageSize: 50,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: "default-seek",
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       msgIDs := make([]MessageID, 0)
+       for i := 0; i < totalMessages; i++ {
+               ID, err := producer.Send(context.Background(), &ProducerMessage{
+                       Payload: createTestMessagePayload(100),
+               })
+               assert.NoError(t, err)
+               msgIDs = append(msgIDs, ID)
+       }
+
+       for i := 0; i < totalMessages; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.NoError(t, err)
+               assert.NotNil(t, msg)
+               assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+       }
+
+       err = consumer.Seek(msgIDs[1])
+       assert.NoError(t, err)
+
+       for i := 1; i < totalMessages; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.NoError(t, err)
+               assert.NotNil(t, msg)
+               assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+       }
+
+       // todo: add reader seek test when support reader read chunk message
+}
+
+func TestChunkAckAndNAck(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               EnableChunking:      true,
+               DisableBatching:     true,
+               ChunkMaxMessageSize: 50,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topic,
+               Type:                Exclusive,
+               SubscriptionName:    "default-seek",
+               NackRedeliveryDelay: time.Second,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+       defer consumer.Close()
+
+       content := createTestMessagePayload(100)
+
+       _, err = producer.Send(context.Background(), &ProducerMessage{
+               Payload: content,
+       })
+       assert.NoError(t, err)
+
+       ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+       msg, err := consumer.Receive(ctx)
+       cancel()
+       assert.NoError(t, err)
+       assert.NotNil(t, msg)
+       assert.Equal(t, msg.Payload(), content)
+
+       consumer.Nack(msg)
+       time.Sleep(time.Second * 2)
+
+       ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
+       msg, err = consumer.Receive(ctx)
+       cancel()
+       assert.NoError(t, err)
+       assert.NotNil(t, msg)
+       assert.Equal(t, msg.Payload(), content)
+}
+
+func TestChunkSize(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // the default message metadata size for string schema
+       // The proto messageMetaData size as following. (all with tag) 
(maxMessageSize = 1024 * 1024)
+       // | producerName | sequenceID | publishTime | uncompressedSize |
+       // | ------------ | ---------- | ----------- | ---------------- |
+       // | 6            | 2          | 7           | 4                |
+       payloadChunkSize := _brokerMaxMessageSize - 19
+
+       topic := newTopicName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Name:            "test",
+               Topic:           topic,
+               EnableChunking:  true,
+               DisableBatching: true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       for size := payloadChunkSize; size <= _brokerMaxMessageSize; size++ {
+               msgID, err := producer.Send(context.Background(), 
&ProducerMessage{
+                       Payload: createTestMessagePayload(size),
+               })
+               assert.NoError(t, err)
+               if size <= payloadChunkSize {
+                       _, ok := msgID.(messageID)
+                       assert.Equal(t, true, ok)
+               } else {
+                       _, ok := msgID.(chunkMessageID)
+                       assert.Equal(t, true, ok)
+               }
+       }
+}
+
+func TestChunkMultiTopicConsumerReceive(t *testing.T) {
+       topic1 := newTopicName()
+       topic2 := newTopicName()
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       topics := []string{topic1, topic2}
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topics:           topics,
+               SubscriptionName: "multi-topic-sub",
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer consumer.Close()
+
+       maxSize := 50
+
+       // produce messages
+       for i, topic := range topics {
+               p, err := client.CreateProducer(ProducerOptions{
+                       Topic:               topic,
+                       DisableBatching:     true,
+                       EnableChunking:      true,
+                       ChunkMaxMessageSize: uint(maxSize),
+               })
+               if err != nil {
+                       t.Fatal(err)
+               }
+               err = genMessages(p, 10, func(idx int) string {
+                       return fmt.Sprintf("topic-%d-hello-%d-%s", i+1, idx, 
string(createTestMessagePayload(100)))
+               })
+               p.Close()
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       receivedTopic1 := 0
+       receivedTopic2 := 0
+       // nolint
+       for receivedTopic1+receivedTopic2 < 20 {
+               select {
+               case cm, ok := <-consumer.Chan():
+                       if ok {
+                               msg := string(cm.Payload())
+                               if strings.HasPrefix(msg, "topic-1") {
+                                       receivedTopic1++
+                               } else if strings.HasPrefix(msg, "topic-2") {
+                                       receivedTopic2++
+                               }
+                               consumer.Ack(cm.Message)
+                       } else {
+                               t.Fail()
+                       }
+               }

Review Comment:
   It's better to add the timeout for this select. 



##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) 
trackingMessageID {
 
        return msgID
 }
+
+type chunkedMsgCtx struct {
+       totalChunks      int32
+       chunkedMsgBuffer internal.Buffer
+       lastChunkedMsgID int32
+       chunkedMsgIDs    []messageID
+       receivedTime     int64
+
+       mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) 
*chunkedMsgCtx {
+       return &chunkedMsgCtx{
+               totalChunks:      numChunksFromMsg,
+               chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+               lastChunkedMsgID: -1,
+               chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+               receivedTime:     time.Now().Unix(),
+       }
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload 
internal.Buffer) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.chunkedMsgIDs[chunkID] = msgID
+       c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+       c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if len(c.chunkedMsgIDs) == 0 {
+               return messageID{}
+       }
+       return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if len(c.chunkedMsgIDs) == 0 {
+               return messageID{}
+       }
+       return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       for _, mid := range c.chunkedMsgIDs {
+               pc.log.Info("Removing chunk message-id", mid.String())
+               tmid, _ := toTrackingMessageID(mid)
+               pc.AckID(tmid)
+       }
+}
+
+type chunkedMsgCtxMap struct {
+       chunkedMsgCtxs map[string]*chunkedMsgCtx
+       pendingQueue   *list.List
+       maxPending     int
+       pc             *partitionConsumer
+       mu             sync.Mutex
+       closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) 
*chunkedMsgCtxMap {
+       return &chunkedMsgCtxMap{
+               chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+               pendingQueue:   list.New(),
+               maxPending:     maxPending,
+               pc:             pc,
+               mu:             sync.Mutex{},
+       }
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, 
totalChunkMsgSize int) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.closed {
+               return
+       }
+       if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+               c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, 
totalChunkMsgSize)
+               c.pendingQueue.PushBack(uuid)
+               go c.removeChunkIfExpire(uuid, true, 
c.pc.options.expireTimeOfIncompleteChunk)
+       }
+       if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+               go c.removeChunkMessage(uuid, 
c.pc.options.autoAckIncompleteChunk)

Review Comment:
   Need to release the first chunked message, but this seems to remove the 
latest one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to