This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 59ef32b [fix][txn] Fix the transaction acknowledgement and send logic
for chunk message (#1069)
59ef32b is described below
commit 59ef32b03f7595a905388f25b34caaf8ebecffb2
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Jul 26 18:57:56 2023 +0800
[fix][txn] Fix the transaction acknowledgement and send logic for chunk
message (#1069)
Master https://github.com/apache/pulsar-client-go/issues/1060
### Motivation
1. For the chunk message, we only register the send operation once but end
the send operation multiple times when receiving the send response. It will
make the transaction can be committed before all the operations are completed.
2. When we use transaction ack for chunk messages, the provided transaction
is ignored, resulting in the chunk message actually being acknowledged using
the non-transactional ack method.
### Modifications
1. Only end the send operation when receive the last chunk message.
2. Add the check for the transaction when the massage is a chunk message.
---
pulsar/consumer_partition.go | 17 +++++++-
pulsar/transaction_test.go | 101 ++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 114 insertions(+), 4 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 28cb9c5..b50c0a0 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -426,7 +426,10 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID,
withResponse bool, txn
}
if cmid, ok := msgID.(*chunkMessageID); ok {
- return pc.unAckChunksTracker.ack(cmid)
+ if txn == nil {
+ return pc.unAckChunksTracker.ack(cmid)
+ }
+ return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
}
trackingID := toTrackingMessageID(msgID)
@@ -2212,9 +2215,19 @@ func (u *unAckChunksTracker) remove(cmid
*chunkMessageID) {
}
func (u *unAckChunksTracker) ack(cmid *chunkMessageID) error {
+ return u.ackWithTxn(cmid, nil)
+}
+
+func (u *unAckChunksTracker) ackWithTxn(cmid *chunkMessageID, txn Transaction)
error {
ids := u.get(cmid)
for _, id := range ids {
- if err := u.pc.AckID(id); err != nil {
+ var err error
+ if txn == nil {
+ err = u.pc.AckID(id)
+ } else {
+ err = u.pc.AckIDWithTxn(id, txn)
+ }
+ if err != nil {
return err
}
}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 66a82cc..385b197 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -423,6 +423,14 @@ func TestTransactionAbort(t *testing.T) {
// Abort the transaction.
_ = txn.Abort(context.Background())
+ consumerShouldNotReceiveMessage(t, consumer)
+
+ // Clean up: Close the consumer and producer instances.
+ consumer.Close()
+ producer.Close()
+}
+
+func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) {
// Expectation: The consumer should not receive any messages.
done := make(chan struct{})
go func() {
@@ -438,8 +446,97 @@ func TestTransactionAbort(t *testing.T) {
require.Fail(t, "The consumer should not receive any messages")
case <-time.After(time.Second):
}
+}
- // Clean up: Close the consumer and producer instances.
+func TestAckChunkMessage(t *testing.T) {
+ topic := newTopicName()
+ sub := "my-sub"
+
+ // Prepare: Create PulsarClient and initialize the transaction
coordinator client.
+ _, client := createTcClient(t)
+
+ // Create transaction and register the send operation.
+ txn, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+ txn.(*transaction).registerSendOrAckOp()
+
+ // Create a producer with chunking enabled to send a large message that
will be split into chunks.
+ producer, err := client.CreateProducer(ProducerOptions{
+ Name: "test",
+ Topic: topic,
+ EnableChunking: true,
+ DisableBatching: true,
+ })
+ require.NoError(t, err)
+ require.NotNil(t, producer)
+ defer producer.Close()
+
+ // Subscribe to the consumer.
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: sub,
+ })
+ require.NoError(t, err)
+ defer consumer.Close()
+
+ // Send a large message that will be split into chunks.
+ msgID, err := producer.Send(context.Background(), &ProducerMessage{
+ Transaction: txn,
+ Payload: createTestMessagePayload(_brokerMaxMessageSize),
+ })
+ require.NoError(t, err)
+ _, ok := msgID.(*chunkMessageID)
+ require.True(t, ok)
+
+ err = txn.Commit(context.Background())
+ require.Nil(t, err)
+
+ // Receive the message using a new transaction and ack it.
+ txn2, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+ message, err := consumer.Receive(context.Background())
+ require.Nil(t, err)
+
+ err = consumer.AckWithTxn(message, txn2)
+ require.Nil(t, err)
+
+ txn2.Abort(context.Background())
+
+ // Close the consumer to simulate reconnection and receive the same
message again.
consumer.Close()
- producer.Close()
+
+ // Subscribe to the consumer again.
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: sub,
+ })
+ require.Nil(t, err)
+ message, err = consumer.Receive(context.Background())
+ require.Nil(t, err)
+ require.NotNil(t, message)
+
+ // Create a new transaction and ack the message again.
+ txn3, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+
+ err = consumer.AckWithTxn(message, txn3)
+ require.Nil(t, err)
+
+ // Commit the third transaction.
+ err = txn3.Commit(context.Background())
+ require.Nil(t, err)
+
+ // Close the consumer again.
+ consumer.Close()
+
+ // Subscribe to the consumer again and verify that no message is
received.
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: sub,
+ })
+ require.Nil(t, err)
+ consumerShouldNotReceiveMessage(t, consumer)
}