This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.11.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit f3cd74a5e8e0b3de282bdee4ea1dd3b2f7e710be
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Jul 26 18:57:56 2023 +0800

    [fix][txn] Fix the transaction acknowledgement and send logic for  chunk 
message (#1069)
    
    Master https://github.com/apache/pulsar-client-go/issues/1060
    ### Motivation
    1. For the chunk message, we only register the send operation once but end 
the send operation multiple times when receiving the send response. It will 
make the transaction can be committed before all the operations are completed.
    2. When we use transaction ack for chunk messages, the provided transaction 
is ignored, resulting in the chunk message actually being acknowledged using 
the non-transactional ack method.
    ### Modifications
    1. Only end the send operation when receive the last chunk message.
    2. Add the check for the transaction when the massage is a chunk message.
    
    (cherry picked from commit 59ef32b03f7595a905388f25b34caaf8ebecffb2)
---
 pulsar/consumer_partition.go |  17 +++++++-
 pulsar/transaction_test.go   | 101 ++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 114 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cd7aa501..f5e9febe 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -426,7 +426,10 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, 
withResponse bool, txn
        }
 
        if cmid, ok := msgID.(*chunkMessageID); ok {
-               return pc.unAckChunksTracker.ack(cmid)
+               if txn == nil {
+                       return pc.unAckChunksTracker.ack(cmid)
+               }
+               return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
        }
 
        trackingID := toTrackingMessageID(msgID)
@@ -2203,9 +2206,19 @@ func (u *unAckChunksTracker) remove(cmid 
*chunkMessageID) {
 }
 
 func (u *unAckChunksTracker) ack(cmid *chunkMessageID) error {
+       return u.ackWithTxn(cmid, nil)
+}
+
+func (u *unAckChunksTracker) ackWithTxn(cmid *chunkMessageID, txn Transaction) 
error {
        ids := u.get(cmid)
        for _, id := range ids {
-               if err := u.pc.AckID(id); err != nil {
+               var err error
+               if txn == nil {
+                       err = u.pc.AckID(id)
+               } else {
+                       err = u.pc.AckIDWithTxn(id, txn)
+               }
+               if err != nil {
                        return err
                }
        }
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 66a82cca..385b197e 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -423,6 +423,14 @@ func TestTransactionAbort(t *testing.T) {
        // Abort the transaction.
        _ = txn.Abort(context.Background())
 
+       consumerShouldNotReceiveMessage(t, consumer)
+
+       // Clean up: Close the consumer and producer instances.
+       consumer.Close()
+       producer.Close()
+}
+
+func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) {
        // Expectation: The consumer should not receive any messages.
        done := make(chan struct{})
        go func() {
@@ -438,8 +446,97 @@ func TestTransactionAbort(t *testing.T) {
                require.Fail(t, "The consumer should not receive any messages")
        case <-time.After(time.Second):
        }
+}
 
-       // Clean up: Close the consumer and producer instances.
+func TestAckChunkMessage(t *testing.T) {
+       topic := newTopicName()
+       sub := "my-sub"
+
+       // Prepare: Create PulsarClient and initialize the transaction 
coordinator client.
+       _, client := createTcClient(t)
+
+       // Create transaction and register the send operation.
+       txn, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+       txn.(*transaction).registerSendOrAckOp()
+
+       // Create a producer with chunking enabled to send a large message that 
will be split into chunks.
+       producer, err := client.CreateProducer(ProducerOptions{
+               Name:            "test",
+               Topic:           topic,
+               EnableChunking:  true,
+               DisableBatching: true,
+       })
+       require.NoError(t, err)
+       require.NotNil(t, producer)
+       defer producer.Close()
+
+       // Subscribe to the consumer.
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.NoError(t, err)
+       defer consumer.Close()
+
+       // Send a large message that will be split into chunks.
+       msgID, err := producer.Send(context.Background(), &ProducerMessage{
+               Transaction: txn,
+               Payload:     createTestMessagePayload(_brokerMaxMessageSize),
+       })
+       require.NoError(t, err)
+       _, ok := msgID.(*chunkMessageID)
+       require.True(t, ok)
+
+       err = txn.Commit(context.Background())
+       require.Nil(t, err)
+
+       // Receive the message using a new transaction and ack it.
+       txn2, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+       message, err := consumer.Receive(context.Background())
+       require.Nil(t, err)
+
+       err = consumer.AckWithTxn(message, txn2)
+       require.Nil(t, err)
+
+       txn2.Abort(context.Background())
+
+       // Close the consumer to simulate reconnection and receive the same 
message again.
        consumer.Close()
-       producer.Close()
+
+       // Subscribe to the consumer again.
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.Nil(t, err)
+       message, err = consumer.Receive(context.Background())
+       require.Nil(t, err)
+       require.NotNil(t, message)
+
+       // Create a new transaction and ack the message again.
+       txn3, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+
+       err = consumer.AckWithTxn(message, txn3)
+       require.Nil(t, err)
+
+       // Commit the third transaction.
+       err = txn3.Commit(context.Background())
+       require.Nil(t, err)
+
+       // Close the consumer again.
+       consumer.Close()
+
+       // Subscribe to the consumer again and verify that no message is 
received.
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.Nil(t, err)
+       consumerShouldNotReceiveMessage(t, consumer)
 }

Reply via email to