This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c9245bc7 fix: enhance transaction functionality (#1281)
c9245bc7 is described below

commit c9245bc7cbf0006f007abd32d97ec068122024db
Author: Eugene R. <[email protected]>
AuthorDate: Tue Dec 17 13:00:14 2024 +0200

    fix: enhance transaction functionality (#1281)
    
    ### Motivation
    Various fixes and refactoring for transaction.
    
    ### Modifications
    
    * Employ context in the `Commit` and `Abort` methods
    * Use client operation timeout
    * Use `atomic.Int32` for the state
    * Make all state reads atomic
    * Clean up and improve error messages
---
 .golangci.yml                |   2 +-
 pulsar/consumer_partition.go |   4 +-
 pulsar/producer_partition.go |   4 +-
 pulsar/transaction_impl.go   | 112 ++++++++++++++++++++++++-------------------
 pulsar/transaction_test.go   |  89 ++++++++++++++++++----------------
 5 files changed, 117 insertions(+), 94 deletions(-)

diff --git a/.golangci.yml b/.golangci.yml
index 1f1b42c5..58ecd754 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -18,7 +18,7 @@
 # Run `make lint` from the root path of this project to check code with 
golangci-lint.
 
 run:
-  deadline: 6m
+  timeout: 5m
 
 linters:
   # Uncomment this line to run only the explicitly enabled linters
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 568dcaa9..98848d71 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -532,8 +532,8 @@ func (pc *partitionConsumer) internalAckWithTxn(req 
*ackWithTxnRequest) {
                req.err = newError(ConsumerClosed, "Failed to ack by closing or 
closed consumer")
                return
        }
-       if req.Transaction.state != TxnOpen {
-               pc.log.WithField("state", req.Transaction.state).Error("Failed 
to ack by a non-open transaction.")
+       if req.Transaction.state.Load() != int32(TxnOpen) {
+               pc.log.WithField("state", 
req.Transaction.state.Load()).Error("Failed to ack by a non-open transaction.")
                req.err = newError(InvalidStatus, "Failed to ack by a non-open 
transaction.")
                return
        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 7694c967..371ffb6a 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1135,8 +1135,8 @@ func (p *partitionProducer) prepareTransaction(sr 
*sendRequest) error {
        }
 
        txn := (sr.msg.Transaction).(*transaction)
-       if txn.state != TxnOpen {
-               p.log.WithField("state", txn.state).Error("Failed to send 
message" +
+       if txn.state.Load() != int32(TxnOpen) {
+               p.log.WithField("state", txn.state.Load()).Error("Failed to 
send message" +
                        " by a non-open transaction.")
                return joinErrors(ErrTransaction,
                        fmt.Errorf("failed to send message by a non-open 
transaction"))
diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go
index 82e6fcfa..a4897816 100644
--- a/pulsar/transaction_impl.go
+++ b/pulsar/transaction_impl.go
@@ -19,6 +19,8 @@ package pulsar
 
 import (
        "context"
+       "errors"
+       "fmt"
        "sync"
        "sync/atomic"
        "time"
@@ -33,9 +35,9 @@ type subscription struct {
 }
 
 type transaction struct {
-       sync.Mutex
+       mu                       sync.Mutex
        txnID                    TxnID
-       state                    TxnState
+       state                    atomic.Int32
        tcClient                 *transactionCoordinatorClient
        registerPartitions       map[string]bool
        registerAckSubscriptions map[subscription]bool
@@ -54,7 +56,7 @@ type transaction struct {
        //     1. When the transaction is committed or aborted, a bool will be 
read from opsFlow chan.
        //     2. When the opsCount increment from 0 to 1, a bool will be read 
from opsFlow chan.
        opsFlow   chan bool
-       opsCount  int32
+       opsCount  atomic.Int32
        opTimeout time.Duration
        log       log.Logger
 }
@@ -62,47 +64,52 @@ type transaction struct {
 func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout 
time.Duration) *transaction {
        transaction := &transaction{
                txnID:                    id,
-               state:                    TxnOpen,
                registerPartitions:       make(map[string]bool),
                registerAckSubscriptions: make(map[subscription]bool),
                opsFlow:                  make(chan bool, 1),
-               opTimeout:                5 * time.Second,
+               opTimeout:                tcClient.client.operationTimeout,
                tcClient:                 tcClient,
        }
-       //This means there are not pending requests with this transaction. The 
transaction can be committed or aborted.
+       transaction.state.Store(int32(TxnOpen))
+       // This means there are not pending requests with this transaction. The 
transaction can be committed or aborted.
        transaction.opsFlow <- true
        go func() {
-               //Set the state of the transaction to timeout after timeout
+               // Set the state of the transaction to timeout after timeout
                <-time.After(timeout)
-               atomic.CompareAndSwapInt32((*int32)(&transaction.state), 
int32(TxnOpen), int32(TxnTimeout))
+               transaction.state.CompareAndSwap(int32(TxnOpen), 
int32(TxnTimeout))
        }()
        transaction.log = tcClient.log.SubLogger(log.Fields{})
        return transaction
 }
 
 func (txn *transaction) GetState() TxnState {
-       return txn.state
+       return TxnState(txn.state.Load())
 }
 
-func (txn *transaction) Commit(_ context.Context) error {
-       if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), 
int32(TxnCommitting)) ||
-               txn.state == TxnCommitting) {
-               return newError(InvalidStatus, "Expect transaction state is 
TxnOpen but "+txn.state.string())
+func (txn *transaction) Commit(ctx context.Context) error {
+       if !(txn.state.CompareAndSwap(int32(TxnOpen), int32(TxnCommitting))) {
+               txnState := txn.state.Load()
+               return newError(InvalidStatus, txnStateErrorMessage(TxnOpen, 
TxnState(txnState)))
        }
 
-       //Wait for all operations to complete
+       // Wait for all operations to complete
        select {
        case <-txn.opsFlow:
+       case <-ctx.Done():
+               txn.state.Store(int32(TxnOpen))
+               return ctx.Err()
        case <-time.After(txn.opTimeout):
+               txn.state.Store(int32(TxnTimeout))
                return newError(TimeoutError, "There are some operations that 
are not completed after the timeout.")
        }
-       //Send commit transaction command to transaction coordinator
+       // Send commit transaction command to transaction coordinator
        err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT)
        if err == nil {
-               atomic.StoreInt32((*int32)(&txn.state), int32(TxnCommitted))
+               txn.state.Store(int32(TxnCommitted))
        } else {
-               if e, ok := err.(*Error); ok && (e.Result() == 
TransactionNoFoundError || e.Result() == InvalidStatus) {
-                       atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+               var e *Error
+               if errors.As(err, &e) && (e.Result() == TransactionNoFoundError 
|| e.Result() == InvalidStatus) {
+                       txn.state.Store(int32(TxnError))
                        return err
                }
                txn.opsFlow <- true
@@ -110,40 +117,45 @@ func (txn *transaction) Commit(_ context.Context) error {
        return err
 }
 
-func (txn *transaction) Abort(_ context.Context) error {
-       if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), 
int32(TxnAborting)) ||
-               txn.state == TxnAborting) {
-               return newError(InvalidStatus, "Expect transaction state is 
TxnOpen but "+txn.state.string())
+func (txn *transaction) Abort(ctx context.Context) error {
+       if !(txn.state.CompareAndSwap(int32(TxnOpen), int32(TxnAborting))) {
+               txnState := txn.state.Load()
+               return newError(InvalidStatus, txnStateErrorMessage(TxnOpen, 
TxnState(txnState)))
        }
 
-       //Wait for all operations to complete
+       // Wait for all operations to complete
        select {
        case <-txn.opsFlow:
+       case <-ctx.Done():
+               txn.state.Store(int32(TxnOpen))
+               return ctx.Err()
        case <-time.After(txn.opTimeout):
+               txn.state.Store(int32(TxnTimeout))
                return newError(TimeoutError, "There are some operations that 
are not completed after the timeout.")
        }
-       //Send abort transaction command to transaction coordinator
+       // Send abort transaction command to transaction coordinator
        err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT)
        if err == nil {
-               atomic.StoreInt32((*int32)(&txn.state), int32(TxnAborted))
+               txn.state.Store(int32(TxnAborted))
        } else {
-               if e, ok := err.(*Error); ok && (e.Result() == 
TransactionNoFoundError || e.Result() == InvalidStatus) {
-                       atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
-               } else {
-                       txn.opsFlow <- true
+               var e *Error
+               if errors.As(err, &e) && (e.Result() == TransactionNoFoundError 
|| e.Result() == InvalidStatus) {
+                       txn.state.Store(int32(TxnError))
+                       return err
                }
+               txn.opsFlow <- true
        }
        return err
 }
 
 func (txn *transaction) registerSendOrAckOp() error {
-       if atomic.AddInt32(&txn.opsCount, 1) == 1 {
-               //There are new operations that not completed
+       if txn.opsCount.Add(1) == 1 {
+               // There are new operations that were not completed
                select {
                case <-txn.opsFlow:
                        return nil
                case <-time.After(txn.opTimeout):
-                       if _, err := txn.checkIfOpen(); err != nil {
+                       if err := txn.verifyOpen(); err != nil {
                                return err
                        }
                        return newError(TimeoutError, "Failed to get the 
semaphore to register the send/ack operation")
@@ -154,23 +166,22 @@ func (txn *transaction) registerSendOrAckOp() error {
 
 func (txn *transaction) endSendOrAckOp(err error) {
        if err != nil {
-               atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+               txn.state.Store(int32(TxnError))
        }
-       if atomic.AddInt32(&txn.opsCount, -1) == 0 {
-               //This means there are not pending send/ack requests
+       if txn.opsCount.Add(-1) == 0 {
+               // This means there are no pending send/ack requests
                txn.opsFlow <- true
        }
 }
 
 func (txn *transaction) registerProducerTopic(topic string) error {
-       isOpen, err := txn.checkIfOpen()
-       if !isOpen {
+       if err := txn.verifyOpen(); err != nil {
                return err
        }
        _, ok := txn.registerPartitions[topic]
        if !ok {
-               txn.Lock()
-               defer txn.Unlock()
+               txn.mu.Lock()
+               defer txn.mu.Unlock()
                if _, ok = txn.registerPartitions[topic]; !ok {
                        err := 
txn.tcClient.addPublishPartitionToTxn(&txn.txnID, []string{topic})
                        if err != nil {
@@ -183,8 +194,7 @@ func (txn *transaction) registerProducerTopic(topic string) 
error {
 }
 
 func (txn *transaction) registerAckTopic(topic string, subName string) error {
-       isOpen, err := txn.checkIfOpen()
-       if !isOpen {
+       if err := txn.verifyOpen(); err != nil {
                return err
        }
        sub := subscription{
@@ -193,8 +203,8 @@ func (txn *transaction) registerAckTopic(topic string, 
subName string) error {
        }
        _, ok := txn.registerAckSubscriptions[sub]
        if !ok {
-               txn.Lock()
-               defer txn.Unlock()
+               txn.mu.Lock()
+               defer txn.mu.Unlock()
                if _, ok = txn.registerAckSubscriptions[sub]; !ok {
                        err := txn.tcClient.addSubscriptionToTxn(&txn.txnID, 
topic, subName)
                        if err != nil {
@@ -210,14 +220,15 @@ func (txn *transaction) GetTxnID() TxnID {
        return txn.txnID
 }
 
-func (txn *transaction) checkIfOpen() (bool, error) {
-       if txn.state == TxnOpen {
-               return true, nil
+func (txn *transaction) verifyOpen() error {
+       txnState := txn.state.Load()
+       if txnState != int32(TxnOpen) {
+               return newError(InvalidStatus, txnStateErrorMessage(TxnOpen, 
TxnState(txnState)))
        }
-       return false, newError(InvalidStatus, "Expect transaction state is 
TxnOpen but "+txn.state.string())
+       return nil
 }
 
-func (state TxnState) string() string {
+func (state TxnState) String() string {
        switch state {
        case TxnOpen:
                return "TxnOpen"
@@ -237,3 +248,8 @@ func (state TxnState) string() string {
                return "Unknown"
        }
 }
+
+//nolint:unparam
+func txnStateErrorMessage(expected, actual TxnState) string {
+       return fmt.Sprintf("Expected transaction state: %s, actual: %s", 
expected, actual)
+}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index eb88c706..75b36ea8 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -29,7 +29,9 @@ import (
        "github.com/stretchr/testify/require"
 )
 
-func TestTCClient(t *testing.T) {
+const txnTimeout = 10 * time.Minute
+
+func TestTxn_TCClient(t *testing.T) {
        //1. Prepare: create PulsarClient and init transaction coordinator 
client.
        topic := newTopicName()
        sub := "my-sub"
@@ -52,13 +54,13 @@ func TestTCClient(t *testing.T) {
        stats, err := transactionStats(id1)
        assert.NoError(t, err)
        assert.Equal(t, "OPEN", stats["status"])
-       producedPartitions := 
stats["producedPartitions"].(map[string]interface{})
-       ackedPartitions := stats["ackedPartitions"].(map[string]interface{})
+       producedPartitions := stats["producedPartitions"].(map[string]any)
+       ackedPartitions := stats["ackedPartitions"].(map[string]any)
        _, ok := producedPartitions[topic]
        assert.True(t, ok)
        temp, ok := ackedPartitions[topic]
        assert.True(t, ok)
-       subscriptions := temp.(map[string]interface{})
+       subscriptions := temp.(map[string]any)
        _, ok = subscriptions[sub]
        assert.True(t, ok)
        //5. Test End transaction
@@ -78,12 +80,12 @@ func TestTCClient(t *testing.T) {
        } else {
                assert.Equal(t, err.Error(), "http error status code: 404")
        }
-       defer consumer.Close()
-       defer tc.close()
-       defer client.Close()
+       consumer.Close()
+       tc.close()
+       client.Close()
 }
 
-//Test points:
+// Test points:
 //     1. Abort and commit txn.
 //             1. Do nothing, just open a transaction and then commit it or 
abort it.
 //             The operations of committing/aborting txn should success at the 
first time and fail at the second time.
@@ -96,8 +98,8 @@ func TestTCClient(t *testing.T) {
 //             1. Register ack topic and send topic, and call http request to 
get the stats of the transaction
 //             to do verification.
 
-// TestTxnImplCommitOrAbort Test abort and commit txn
-func TestTxnImplCommitOrAbort(t *testing.T) {
+// Test abort and commit txn
+func TestTxn_ImplCommitOrAbort(t *testing.T) {
        tc, _ := createTcClient(t)
        //1. Open a transaction and then commit it.
        //The operations of committing txn1 should success at the first time 
and fail at the second time.
@@ -105,20 +107,20 @@ func TestTxnImplCommitOrAbort(t *testing.T) {
        err := txn1.Commit(context.Background())
        require.Nil(t, err, fmt.Sprintf("Failed to commit the transaction 
%d:%d\n", txn1.txnID.MostSigBits,
                txn1.txnID.LeastSigBits))
-       txn1.state = TxnOpen
+       txn1.state.Store(int32(TxnOpen))
        txn1.opsFlow <- true
        err = txn1.Commit(context.Background())
        assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
        assert.Equal(t, txn1.GetState(), TxnError)
        //2. Open a transaction and then abort it.
        //The operations of aborting txn2 should success at the first time and 
fail at the second time.
-       id2, err := tc.newTransaction(time.Hour)
-       require.Nil(t, err, "Failed to new a transaction")
-       txn2 := newTransaction(*id2, tc, time.Hour)
+       id2, err := tc.newTransaction(txnTimeout)
+       require.Nil(t, err, "Failed to create a transaction")
+       txn2 := newTransaction(*id2, tc, txnTimeout)
        err = txn2.Abort(context.Background())
        require.Nil(t, err, fmt.Sprintf("Failed to abort the transaction 
%d:%d\n",
                id2.MostSigBits, id2.LeastSigBits))
-       txn2.state = TxnOpen
+       txn2.state.Store(int32(TxnOpen))
        txn2.opsFlow <- true
        err = txn2.Abort(context.Background())
        assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
@@ -129,8 +131,8 @@ func TestTxnImplCommitOrAbort(t *testing.T) {
        assert.Equal(t, err.(*Error).Result(), InvalidStatus)
 }
 
-// TestRegisterOpAndEndOp Test the internal API including the 
registerSendOrAckOp and endSendOrAckOp.
-func TestRegisterOpAndEndOp(t *testing.T) {
+// Test the internal API including the registerSendOrAckOp and endSendOrAckOp.
+func TestTxn_RegisterOpAndEndOp(t *testing.T) {
        tc, _ := createTcClient(t)
        //1. Register 4 operation but only end 3 operations, the transaction 
can not be committed or aborted.
        res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
@@ -151,8 +153,8 @@ func TestRegisterOpAndEndOp(t *testing.T) {
        assert.Equal(t, res.(*Error).Result(), InvalidStatus)
 }
 
-// TestRegisterTopic Test the internal API, registerAckTopic and 
registerProducerTopic
-func TestRegisterTopic(t *testing.T) {
+// Test the internal API, registerAckTopic and registerProducerTopic
+func TestTxn_RegisterTopic(t *testing.T) {
        //1. Prepare: create PulsarClient and init transaction coordinator 
client.
        topic := newTopicName()
        sub := "my-sub"
@@ -172,11 +174,11 @@ func TestRegisterTopic(t *testing.T) {
        //4. Call http request to get the stats of the transaction to do 
verification.
        stats2, err := transactionStats(&txn.txnID)
        assert.NoError(t, err)
-       topics := stats2["producedPartitions"].(map[string]interface{})
-       subTopics := stats2["ackedPartitions"].(map[string]interface{})
+       topics := stats2["producedPartitions"].(map[string]any)
+       subTopics := stats2["ackedPartitions"].(map[string]any)
        assert.NotNil(t, topics[topic])
        assert.NotNil(t, subTopics[topic])
-       subs := subTopics[topic].(map[string]interface{})
+       subs := subTopics[topic].(map[string]any)
        assert.NotNil(t, subs[sub])
 }
 
@@ -198,9 +200,9 @@ func registerOpAndEndOp(t *testing.T, tc 
*transactionCoordinatorClient, rp int,
 }
 
 func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
-       id, err := tc.newTransaction(time.Hour)
-       require.Nil(t, err, "Failed to new a transaction.")
-       return newTransaction(*id, tc, time.Hour)
+       id, err := tc.newTransaction(txnTimeout)
+       require.Nil(t, err, "Failed to create a new transaction.")
+       return newTransaction(*id, tc, txnTimeout)
 }
 
 // createTcClient Create a transaction coordinator client to send request
@@ -216,7 +218,7 @@ func createTcClient(t *testing.T) 
(*transactionCoordinatorClient, *client) {
        return c.(*client).tcClient, c.(*client)
 }
 
-// TestConsumeAndProduceWithTxn is a test function that validates the behavior 
of producing and consuming
+// Validate the behavior of producing and consuming
 // messages with and without transactions. It consists of the following steps:
 //
 // 1. Prepare: Create a PulsarClient and initialize the transaction 
coordinator client.
@@ -230,7 +232,7 @@ func createTcClient(t *testing.T) 
(*transactionCoordinatorClient, *client) {
 //
 // The test ensures that the consumer can only receive messages sent with a 
transaction after it is committed,
 // and that it can always receive messages sent without a transaction.
-func TestConsumeAndProduceWithTxn(t *testing.T) {
+func TestTxn_ConsumeAndProduce(t *testing.T) {
        // Step 1: Prepare - Create PulsarClient and initialize the transaction 
coordinator client.
        topic := newTopicName()
        sub := "my-sub"
@@ -249,7 +251,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) {
        // Step 3: Open a transaction, send 10 messages with the transaction 
and 10 messages without the transaction.
        // Expectation: We can receive the 10 messages sent without a 
transaction and
        // cannot receive the 10 messages sent with the transaction.
-       txn, err := client.NewTransaction(time.Hour)
+       txn, err := client.NewTransaction(txnTimeout)
        require.Nil(t, err)
        for i := 0; i < 10; i++ {
                _, err = producer.Send(context.Background(), &ProducerMessage{
@@ -289,7 +291,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) {
        // Acknowledge the rest of the 10 messages with the transaction.
        // Expectation: After committing the transaction, all messages of the 
subscription will be acknowledged.
        _ = txn.Commit(context.Background())
-       txn, err = client.NewTransaction(time.Hour)
+       txn, err = client.NewTransaction(txnTimeout)
        require.Nil(t, err)
        for i := 0; i < 9; i++ {
                msg, _ := consumer.Receive(context.Background())
@@ -307,7 +309,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) {
        // Create a goroutine to attempt receiving a message and send it to the 
'done1' channel.
        done2 := make(chan Message)
        go func() {
-               consumer.Receive(context.Background())
+               _, _ = consumer.Receive(context.Background())
                close(done2)
        }()
 
@@ -323,7 +325,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) {
        producer.Close()
 }
 
-func TestAckAndSendWithTxn(t *testing.T) {
+func TestTxn_AckAndSend(t *testing.T) {
        // Prepare: Create PulsarClient and initialize the transaction 
coordinator client.
        sourceTopic := newTopicName()
        sinkTopic := newTopicName()
@@ -357,7 +359,7 @@ func TestAckAndSendWithTxn(t *testing.T) {
        }
 
        // Open a transaction and consume messages from the source topic while 
sending messages to the sink topic.
-       txn, err := client.NewTransaction(time.Hour)
+       txn, err := client.NewTransaction(txnTimeout)
        require.Nil(t, err)
 
        for i := 0; i < 10; i++ {
@@ -393,7 +395,7 @@ func TestAckAndSendWithTxn(t *testing.T) {
        sinkProducer.Close()
 }
 
-func TestTransactionAbort(t *testing.T) {
+func TestTxn_TransactionAbort(t *testing.T) {
        // Prepare: Create PulsarClient and initialize the transaction 
coordinator client.
        topic := newTopicName()
        sub := "my-sub"
@@ -410,7 +412,7 @@ func TestTransactionAbort(t *testing.T) {
        })
 
        // Open a transaction and send 10 messages with the transaction.
-       txn, err := client.NewTransaction(time.Hour)
+       txn, err := client.NewTransaction(txnTimeout)
        require.Nil(t, err)
 
        for i := 0; i < 10; i++ {
@@ -449,7 +451,7 @@ func consumerShouldNotReceiveMessage(t *testing.T, consumer 
Consumer) {
        }
 }
 
-func TestTransactionAckChunkMessage(t *testing.T) {
+func TestTxn_AckChunkMessage(t *testing.T) {
        topic := newTopicName()
        sub := "my-sub"
 
@@ -457,7 +459,7 @@ func TestTransactionAckChunkMessage(t *testing.T) {
        _, client := createTcClient(t)
 
        // Create transaction and register the send operation.
-       txn, err := client.NewTransaction(time.Hour)
+       txn, err := client.NewTransaction(txnTimeout)
        require.Nil(t, err)
 
        // Create a producer with chunking enabled to send a large message that 
will be split into chunks.
@@ -487,13 +489,13 @@ func TestTransactionAckChunkMessage(t *testing.T) {
        })
        require.NoError(t, err)
        _, ok := msgID.(*chunkMessageID)
-       require.True(t, ok)
+       require.True(t, ok, fmt.Sprintf("Expected message ID of type 
*chunkMessageID, got type %T", msgID))
 
        err = txn.Commit(context.Background())
        require.Nil(t, err)
 
        // Receive the message using a new transaction and ack it.
-       txn2, err := client.NewTransaction(time.Hour)
+       txn2, err := client.NewTransaction(txnTimeout)
        require.Nil(t, err)
        message, err := consumer.Receive(context.Background())
        require.Nil(t, err)
@@ -501,7 +503,7 @@ func TestTransactionAckChunkMessage(t *testing.T) {
        err = consumer.AckWithTxn(message, txn2)
        require.Nil(t, err)
 
-       txn2.Abort(context.Background())
+       _ = txn2.Abort(context.Background())
 
        // Close the consumer to simulate reconnection and receive the same 
message again.
        consumer.Close()
@@ -518,7 +520,7 @@ func TestTransactionAckChunkMessage(t *testing.T) {
        require.NotNil(t, message)
 
        // Create a new transaction and ack the message again.
-       txn3, err := client.NewTransaction(time.Hour)
+       txn3, err := client.NewTransaction(txnTimeout)
        require.Nil(t, err)
 
        err = consumer.AckWithTxn(message, txn3)
@@ -541,7 +543,7 @@ func TestTransactionAckChunkMessage(t *testing.T) {
        consumerShouldNotReceiveMessage(t, consumer)
 }
 
-func TestTxnConnReconnect(t *testing.T) {
+func TestTxn_ConnReconnect(t *testing.T) {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
@@ -593,3 +595,8 @@ func TestTxnConnReconnect(t *testing.T) {
        err = txn.Commit(context.Background())
        assert.NoError(t, err)
 }
+
+func TestTxn_txnStateErrorMessage(t *testing.T) {
+       expected := "Expected transaction state: TxnOpen, actual: TxnTimeout"
+       assert.Equal(t, expected, txnStateErrorMessage(TxnOpen, TxnTimeout))
+}

Reply via email to