This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 59ef32b  [fix][txn] Fix the transaction acknowledgement and send logic 
for  chunk message (#1069)
59ef32b is described below

commit 59ef32b03f7595a905388f25b34caaf8ebecffb2
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Jul 26 18:57:56 2023 +0800

    [fix][txn] Fix the transaction acknowledgement and send logic for  chunk 
message (#1069)
    
    Master https://github.com/apache/pulsar-client-go/issues/1060
    ### Motivation
    1. For the chunk message, we only register the send operation once but end 
the send operation multiple times when receiving the send response. It will 
make the transaction can be committed before all the operations are completed.
    2. When we use transaction ack for chunk messages, the provided transaction 
is ignored, resulting in the chunk message actually being acknowledged using 
the non-transactional ack method.
    ### Modifications
    1. Only end the send operation when receive the last chunk message.
    2. Add the check for the transaction when the massage is a chunk message.
---
 pulsar/consumer_partition.go |  17 +++++++-
 pulsar/transaction_test.go   | 101 ++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 114 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 28cb9c5..b50c0a0 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -426,7 +426,10 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, 
withResponse bool, txn
        }
 
        if cmid, ok := msgID.(*chunkMessageID); ok {
-               return pc.unAckChunksTracker.ack(cmid)
+               if txn == nil {
+                       return pc.unAckChunksTracker.ack(cmid)
+               }
+               return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
        }
 
        trackingID := toTrackingMessageID(msgID)
@@ -2212,9 +2215,19 @@ func (u *unAckChunksTracker) remove(cmid 
*chunkMessageID) {
 }
 
 func (u *unAckChunksTracker) ack(cmid *chunkMessageID) error {
+       return u.ackWithTxn(cmid, nil)
+}
+
+func (u *unAckChunksTracker) ackWithTxn(cmid *chunkMessageID, txn Transaction) 
error {
        ids := u.get(cmid)
        for _, id := range ids {
-               if err := u.pc.AckID(id); err != nil {
+               var err error
+               if txn == nil {
+                       err = u.pc.AckID(id)
+               } else {
+                       err = u.pc.AckIDWithTxn(id, txn)
+               }
+               if err != nil {
                        return err
                }
        }
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 66a82cc..385b197 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -423,6 +423,14 @@ func TestTransactionAbort(t *testing.T) {
        // Abort the transaction.
        _ = txn.Abort(context.Background())
 
+       consumerShouldNotReceiveMessage(t, consumer)
+
+       // Clean up: Close the consumer and producer instances.
+       consumer.Close()
+       producer.Close()
+}
+
+func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) {
        // Expectation: The consumer should not receive any messages.
        done := make(chan struct{})
        go func() {
@@ -438,8 +446,97 @@ func TestTransactionAbort(t *testing.T) {
                require.Fail(t, "The consumer should not receive any messages")
        case <-time.After(time.Second):
        }
+}
 
-       // Clean up: Close the consumer and producer instances.
+func TestAckChunkMessage(t *testing.T) {
+       topic := newTopicName()
+       sub := "my-sub"
+
+       // Prepare: Create PulsarClient and initialize the transaction 
coordinator client.
+       _, client := createTcClient(t)
+
+       // Create transaction and register the send operation.
+       txn, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+       txn.(*transaction).registerSendOrAckOp()
+
+       // Create a producer with chunking enabled to send a large message that 
will be split into chunks.
+       producer, err := client.CreateProducer(ProducerOptions{
+               Name:            "test",
+               Topic:           topic,
+               EnableChunking:  true,
+               DisableBatching: true,
+       })
+       require.NoError(t, err)
+       require.NotNil(t, producer)
+       defer producer.Close()
+
+       // Subscribe to the consumer.
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.NoError(t, err)
+       defer consumer.Close()
+
+       // Send a large message that will be split into chunks.
+       msgID, err := producer.Send(context.Background(), &ProducerMessage{
+               Transaction: txn,
+               Payload:     createTestMessagePayload(_brokerMaxMessageSize),
+       })
+       require.NoError(t, err)
+       _, ok := msgID.(*chunkMessageID)
+       require.True(t, ok)
+
+       err = txn.Commit(context.Background())
+       require.Nil(t, err)
+
+       // Receive the message using a new transaction and ack it.
+       txn2, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+       message, err := consumer.Receive(context.Background())
+       require.Nil(t, err)
+
+       err = consumer.AckWithTxn(message, txn2)
+       require.Nil(t, err)
+
+       txn2.Abort(context.Background())
+
+       // Close the consumer to simulate reconnection and receive the same 
message again.
        consumer.Close()
-       producer.Close()
+
+       // Subscribe to the consumer again.
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.Nil(t, err)
+       message, err = consumer.Receive(context.Background())
+       require.Nil(t, err)
+       require.NotNil(t, message)
+
+       // Create a new transaction and ack the message again.
+       txn3, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+
+       err = consumer.AckWithTxn(message, txn3)
+       require.Nil(t, err)
+
+       // Commit the third transaction.
+       err = txn3.Commit(context.Background())
+       require.Nil(t, err)
+
+       // Close the consumer again.
+       consumer.Close()
+
+       // Subscribe to the consumer again and verify that no message is 
received.
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.Nil(t, err)
+       consumerShouldNotReceiveMessage(t, consumer)
 }

Reply via email to