This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c9245bc7 fix: enhance transaction functionality (#1281)
c9245bc7 is described below
commit c9245bc7cbf0006f007abd32d97ec068122024db
Author: Eugene R. <[email protected]>
AuthorDate: Tue Dec 17 13:00:14 2024 +0200
fix: enhance transaction functionality (#1281)
### Motivation
Various fixes and refactoring for transaction.
### Modifications
* Employ context in the `Commit` and `Abort` methods
* Use client operation timeout
* Use `atomic.Int32` for the state
* Make all state reads atomic
* Clean up and improve error messages
---
.golangci.yml | 2 +-
pulsar/consumer_partition.go | 4 +-
pulsar/producer_partition.go | 4 +-
pulsar/transaction_impl.go | 112 ++++++++++++++++++++++++-------------------
pulsar/transaction_test.go | 89 ++++++++++++++++++----------------
5 files changed, 117 insertions(+), 94 deletions(-)
diff --git a/.golangci.yml b/.golangci.yml
index 1f1b42c5..58ecd754 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -18,7 +18,7 @@
# Run `make lint` from the root path of this project to check code with
golangci-lint.
run:
- deadline: 6m
+ timeout: 5m
linters:
# Uncomment this line to run only the explicitly enabled linters
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 568dcaa9..98848d71 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -532,8 +532,8 @@ func (pc *partitionConsumer) internalAckWithTxn(req
*ackWithTxnRequest) {
req.err = newError(ConsumerClosed, "Failed to ack by closing or
closed consumer")
return
}
- if req.Transaction.state != TxnOpen {
- pc.log.WithField("state", req.Transaction.state).Error("Failed
to ack by a non-open transaction.")
+ if req.Transaction.state.Load() != int32(TxnOpen) {
+ pc.log.WithField("state",
req.Transaction.state.Load()).Error("Failed to ack by a non-open transaction.")
req.err = newError(InvalidStatus, "Failed to ack by a non-open
transaction.")
return
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 7694c967..371ffb6a 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1135,8 +1135,8 @@ func (p *partitionProducer) prepareTransaction(sr
*sendRequest) error {
}
txn := (sr.msg.Transaction).(*transaction)
- if txn.state != TxnOpen {
- p.log.WithField("state", txn.state).Error("Failed to send
message" +
+ if txn.state.Load() != int32(TxnOpen) {
+ p.log.WithField("state", txn.state.Load()).Error("Failed to
send message" +
" by a non-open transaction.")
return joinErrors(ErrTransaction,
fmt.Errorf("failed to send message by a non-open
transaction"))
diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go
index 82e6fcfa..a4897816 100644
--- a/pulsar/transaction_impl.go
+++ b/pulsar/transaction_impl.go
@@ -19,6 +19,8 @@ package pulsar
import (
"context"
+ "errors"
+ "fmt"
"sync"
"sync/atomic"
"time"
@@ -33,9 +35,9 @@ type subscription struct {
}
type transaction struct {
- sync.Mutex
+ mu sync.Mutex
txnID TxnID
- state TxnState
+ state atomic.Int32
tcClient *transactionCoordinatorClient
registerPartitions map[string]bool
registerAckSubscriptions map[subscription]bool
@@ -54,7 +56,7 @@ type transaction struct {
// 1. When the transaction is committed or aborted, a bool will be
read from opsFlow chan.
// 2. When the opsCount increment from 0 to 1, a bool will be read
from opsFlow chan.
opsFlow chan bool
- opsCount int32
+ opsCount atomic.Int32
opTimeout time.Duration
log log.Logger
}
@@ -62,47 +64,52 @@ type transaction struct {
func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout
time.Duration) *transaction {
transaction := &transaction{
txnID: id,
- state: TxnOpen,
registerPartitions: make(map[string]bool),
registerAckSubscriptions: make(map[subscription]bool),
opsFlow: make(chan bool, 1),
- opTimeout: 5 * time.Second,
+ opTimeout: tcClient.client.operationTimeout,
tcClient: tcClient,
}
- //This means there are not pending requests with this transaction. The
transaction can be committed or aborted.
+ transaction.state.Store(int32(TxnOpen))
+ // This means there are not pending requests with this transaction. The
transaction can be committed or aborted.
transaction.opsFlow <- true
go func() {
- //Set the state of the transaction to timeout after timeout
+ // Set the state of the transaction to timeout after timeout
<-time.After(timeout)
- atomic.CompareAndSwapInt32((*int32)(&transaction.state),
int32(TxnOpen), int32(TxnTimeout))
+ transaction.state.CompareAndSwap(int32(TxnOpen),
int32(TxnTimeout))
}()
transaction.log = tcClient.log.SubLogger(log.Fields{})
return transaction
}
func (txn *transaction) GetState() TxnState {
- return txn.state
+ return TxnState(txn.state.Load())
}
-func (txn *transaction) Commit(_ context.Context) error {
- if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen),
int32(TxnCommitting)) ||
- txn.state == TxnCommitting) {
- return newError(InvalidStatus, "Expect transaction state is
TxnOpen but "+txn.state.string())
+func (txn *transaction) Commit(ctx context.Context) error {
+ if !(txn.state.CompareAndSwap(int32(TxnOpen), int32(TxnCommitting))) {
+ txnState := txn.state.Load()
+ return newError(InvalidStatus, txnStateErrorMessage(TxnOpen,
TxnState(txnState)))
}
- //Wait for all operations to complete
+ // Wait for all operations to complete
select {
case <-txn.opsFlow:
+ case <-ctx.Done():
+ txn.state.Store(int32(TxnOpen))
+ return ctx.Err()
case <-time.After(txn.opTimeout):
+ txn.state.Store(int32(TxnTimeout))
return newError(TimeoutError, "There are some operations that
are not completed after the timeout.")
}
- //Send commit transaction command to transaction coordinator
+ // Send commit transaction command to transaction coordinator
err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT)
if err == nil {
- atomic.StoreInt32((*int32)(&txn.state), int32(TxnCommitted))
+ txn.state.Store(int32(TxnCommitted))
} else {
- if e, ok := err.(*Error); ok && (e.Result() ==
TransactionNoFoundError || e.Result() == InvalidStatus) {
- atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+ var e *Error
+ if errors.As(err, &e) && (e.Result() == TransactionNoFoundError
|| e.Result() == InvalidStatus) {
+ txn.state.Store(int32(TxnError))
return err
}
txn.opsFlow <- true
@@ -110,40 +117,45 @@ func (txn *transaction) Commit(_ context.Context) error {
return err
}
-func (txn *transaction) Abort(_ context.Context) error {
- if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen),
int32(TxnAborting)) ||
- txn.state == TxnAborting) {
- return newError(InvalidStatus, "Expect transaction state is
TxnOpen but "+txn.state.string())
+func (txn *transaction) Abort(ctx context.Context) error {
+ if !(txn.state.CompareAndSwap(int32(TxnOpen), int32(TxnAborting))) {
+ txnState := txn.state.Load()
+ return newError(InvalidStatus, txnStateErrorMessage(TxnOpen,
TxnState(txnState)))
}
- //Wait for all operations to complete
+ // Wait for all operations to complete
select {
case <-txn.opsFlow:
+ case <-ctx.Done():
+ txn.state.Store(int32(TxnOpen))
+ return ctx.Err()
case <-time.After(txn.opTimeout):
+ txn.state.Store(int32(TxnTimeout))
return newError(TimeoutError, "There are some operations that
are not completed after the timeout.")
}
- //Send abort transaction command to transaction coordinator
+ // Send abort transaction command to transaction coordinator
err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT)
if err == nil {
- atomic.StoreInt32((*int32)(&txn.state), int32(TxnAborted))
+ txn.state.Store(int32(TxnAborted))
} else {
- if e, ok := err.(*Error); ok && (e.Result() ==
TransactionNoFoundError || e.Result() == InvalidStatus) {
- atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
- } else {
- txn.opsFlow <- true
+ var e *Error
+ if errors.As(err, &e) && (e.Result() == TransactionNoFoundError
|| e.Result() == InvalidStatus) {
+ txn.state.Store(int32(TxnError))
+ return err
}
+ txn.opsFlow <- true
}
return err
}
func (txn *transaction) registerSendOrAckOp() error {
- if atomic.AddInt32(&txn.opsCount, 1) == 1 {
- //There are new operations that not completed
+ if txn.opsCount.Add(1) == 1 {
+ // There are new operations that were not completed
select {
case <-txn.opsFlow:
return nil
case <-time.After(txn.opTimeout):
- if _, err := txn.checkIfOpen(); err != nil {
+ if err := txn.verifyOpen(); err != nil {
return err
}
return newError(TimeoutError, "Failed to get the
semaphore to register the send/ack operation")
@@ -154,23 +166,22 @@ func (txn *transaction) registerSendOrAckOp() error {
func (txn *transaction) endSendOrAckOp(err error) {
if err != nil {
- atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+ txn.state.Store(int32(TxnError))
}
- if atomic.AddInt32(&txn.opsCount, -1) == 0 {
- //This means there are not pending send/ack requests
+ if txn.opsCount.Add(-1) == 0 {
+ // This means there are no pending send/ack requests
txn.opsFlow <- true
}
}
func (txn *transaction) registerProducerTopic(topic string) error {
- isOpen, err := txn.checkIfOpen()
- if !isOpen {
+ if err := txn.verifyOpen(); err != nil {
return err
}
_, ok := txn.registerPartitions[topic]
if !ok {
- txn.Lock()
- defer txn.Unlock()
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
if _, ok = txn.registerPartitions[topic]; !ok {
err :=
txn.tcClient.addPublishPartitionToTxn(&txn.txnID, []string{topic})
if err != nil {
@@ -183,8 +194,7 @@ func (txn *transaction) registerProducerTopic(topic string)
error {
}
func (txn *transaction) registerAckTopic(topic string, subName string) error {
- isOpen, err := txn.checkIfOpen()
- if !isOpen {
+ if err := txn.verifyOpen(); err != nil {
return err
}
sub := subscription{
@@ -193,8 +203,8 @@ func (txn *transaction) registerAckTopic(topic string,
subName string) error {
}
_, ok := txn.registerAckSubscriptions[sub]
if !ok {
- txn.Lock()
- defer txn.Unlock()
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
if _, ok = txn.registerAckSubscriptions[sub]; !ok {
err := txn.tcClient.addSubscriptionToTxn(&txn.txnID,
topic, subName)
if err != nil {
@@ -210,14 +220,15 @@ func (txn *transaction) GetTxnID() TxnID {
return txn.txnID
}
-func (txn *transaction) checkIfOpen() (bool, error) {
- if txn.state == TxnOpen {
- return true, nil
+func (txn *transaction) verifyOpen() error {
+ txnState := txn.state.Load()
+ if txnState != int32(TxnOpen) {
+ return newError(InvalidStatus, txnStateErrorMessage(TxnOpen,
TxnState(txnState)))
}
- return false, newError(InvalidStatus, "Expect transaction state is
TxnOpen but "+txn.state.string())
+ return nil
}
-func (state TxnState) string() string {
+func (state TxnState) String() string {
switch state {
case TxnOpen:
return "TxnOpen"
@@ -237,3 +248,8 @@ func (state TxnState) string() string {
return "Unknown"
}
}
+
+//nolint:unparam
+func txnStateErrorMessage(expected, actual TxnState) string {
+ return fmt.Sprintf("Expected transaction state: %s, actual: %s",
expected, actual)
+}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index eb88c706..75b36ea8 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -29,7 +29,9 @@ import (
"github.com/stretchr/testify/require"
)
-func TestTCClient(t *testing.T) {
+const txnTimeout = 10 * time.Minute
+
+func TestTxn_TCClient(t *testing.T) {
//1. Prepare: create PulsarClient and init transaction coordinator
client.
topic := newTopicName()
sub := "my-sub"
@@ -52,13 +54,13 @@ func TestTCClient(t *testing.T) {
stats, err := transactionStats(id1)
assert.NoError(t, err)
assert.Equal(t, "OPEN", stats["status"])
- producedPartitions :=
stats["producedPartitions"].(map[string]interface{})
- ackedPartitions := stats["ackedPartitions"].(map[string]interface{})
+ producedPartitions := stats["producedPartitions"].(map[string]any)
+ ackedPartitions := stats["ackedPartitions"].(map[string]any)
_, ok := producedPartitions[topic]
assert.True(t, ok)
temp, ok := ackedPartitions[topic]
assert.True(t, ok)
- subscriptions := temp.(map[string]interface{})
+ subscriptions := temp.(map[string]any)
_, ok = subscriptions[sub]
assert.True(t, ok)
//5. Test End transaction
@@ -78,12 +80,12 @@ func TestTCClient(t *testing.T) {
} else {
assert.Equal(t, err.Error(), "http error status code: 404")
}
- defer consumer.Close()
- defer tc.close()
- defer client.Close()
+ consumer.Close()
+ tc.close()
+ client.Close()
}
-//Test points:
+// Test points:
// 1. Abort and commit txn.
// 1. Do nothing, just open a transaction and then commit it or
abort it.
// The operations of committing/aborting txn should success at the
first time and fail at the second time.
@@ -96,8 +98,8 @@ func TestTCClient(t *testing.T) {
// 1. Register ack topic and send topic, and call http request to
get the stats of the transaction
// to do verification.
-// TestTxnImplCommitOrAbort Test abort and commit txn
-func TestTxnImplCommitOrAbort(t *testing.T) {
+// Test abort and commit txn
+func TestTxn_ImplCommitOrAbort(t *testing.T) {
tc, _ := createTcClient(t)
//1. Open a transaction and then commit it.
//The operations of committing txn1 should success at the first time
and fail at the second time.
@@ -105,20 +107,20 @@ func TestTxnImplCommitOrAbort(t *testing.T) {
err := txn1.Commit(context.Background())
require.Nil(t, err, fmt.Sprintf("Failed to commit the transaction
%d:%d\n", txn1.txnID.MostSigBits,
txn1.txnID.LeastSigBits))
- txn1.state = TxnOpen
+ txn1.state.Store(int32(TxnOpen))
txn1.opsFlow <- true
err = txn1.Commit(context.Background())
assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
assert.Equal(t, txn1.GetState(), TxnError)
//2. Open a transaction and then abort it.
//The operations of aborting txn2 should success at the first time and
fail at the second time.
- id2, err := tc.newTransaction(time.Hour)
- require.Nil(t, err, "Failed to new a transaction")
- txn2 := newTransaction(*id2, tc, time.Hour)
+ id2, err := tc.newTransaction(txnTimeout)
+ require.Nil(t, err, "Failed to create a transaction")
+ txn2 := newTransaction(*id2, tc, txnTimeout)
err = txn2.Abort(context.Background())
require.Nil(t, err, fmt.Sprintf("Failed to abort the transaction
%d:%d\n",
id2.MostSigBits, id2.LeastSigBits))
- txn2.state = TxnOpen
+ txn2.state.Store(int32(TxnOpen))
txn2.opsFlow <- true
err = txn2.Abort(context.Background())
assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
@@ -129,8 +131,8 @@ func TestTxnImplCommitOrAbort(t *testing.T) {
assert.Equal(t, err.(*Error).Result(), InvalidStatus)
}
-// TestRegisterOpAndEndOp Test the internal API including the
registerSendOrAckOp and endSendOrAckOp.
-func TestRegisterOpAndEndOp(t *testing.T) {
+// Test the internal API including the registerSendOrAckOp and endSendOrAckOp.
+func TestTxn_RegisterOpAndEndOp(t *testing.T) {
tc, _ := createTcClient(t)
//1. Register 4 operation but only end 3 operations, the transaction
can not be committed or aborted.
res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
@@ -151,8 +153,8 @@ func TestRegisterOpAndEndOp(t *testing.T) {
assert.Equal(t, res.(*Error).Result(), InvalidStatus)
}
-// TestRegisterTopic Test the internal API, registerAckTopic and
registerProducerTopic
-func TestRegisterTopic(t *testing.T) {
+// Test the internal API, registerAckTopic and registerProducerTopic
+func TestTxn_RegisterTopic(t *testing.T) {
//1. Prepare: create PulsarClient and init transaction coordinator
client.
topic := newTopicName()
sub := "my-sub"
@@ -172,11 +174,11 @@ func TestRegisterTopic(t *testing.T) {
//4. Call http request to get the stats of the transaction to do
verification.
stats2, err := transactionStats(&txn.txnID)
assert.NoError(t, err)
- topics := stats2["producedPartitions"].(map[string]interface{})
- subTopics := stats2["ackedPartitions"].(map[string]interface{})
+ topics := stats2["producedPartitions"].(map[string]any)
+ subTopics := stats2["ackedPartitions"].(map[string]any)
assert.NotNil(t, topics[topic])
assert.NotNil(t, subTopics[topic])
- subs := subTopics[topic].(map[string]interface{})
+ subs := subTopics[topic].(map[string]any)
assert.NotNil(t, subs[sub])
}
@@ -198,9 +200,9 @@ func registerOpAndEndOp(t *testing.T, tc
*transactionCoordinatorClient, rp int,
}
func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
- id, err := tc.newTransaction(time.Hour)
- require.Nil(t, err, "Failed to new a transaction.")
- return newTransaction(*id, tc, time.Hour)
+ id, err := tc.newTransaction(txnTimeout)
+ require.Nil(t, err, "Failed to create a new transaction.")
+ return newTransaction(*id, tc, txnTimeout)
}
// createTcClient Create a transaction coordinator client to send request
@@ -216,7 +218,7 @@ func createTcClient(t *testing.T)
(*transactionCoordinatorClient, *client) {
return c.(*client).tcClient, c.(*client)
}
-// TestConsumeAndProduceWithTxn is a test function that validates the behavior
of producing and consuming
+// Validate the behavior of producing and consuming
// messages with and without transactions. It consists of the following steps:
//
// 1. Prepare: Create a PulsarClient and initialize the transaction
coordinator client.
@@ -230,7 +232,7 @@ func createTcClient(t *testing.T)
(*transactionCoordinatorClient, *client) {
//
// The test ensures that the consumer can only receive messages sent with a
transaction after it is committed,
// and that it can always receive messages sent without a transaction.
-func TestConsumeAndProduceWithTxn(t *testing.T) {
+func TestTxn_ConsumeAndProduce(t *testing.T) {
// Step 1: Prepare - Create PulsarClient and initialize the transaction
coordinator client.
topic := newTopicName()
sub := "my-sub"
@@ -249,7 +251,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) {
// Step 3: Open a transaction, send 10 messages with the transaction
and 10 messages without the transaction.
// Expectation: We can receive the 10 messages sent without a
transaction and
// cannot receive the 10 messages sent with the transaction.
- txn, err := client.NewTransaction(time.Hour)
+ txn, err := client.NewTransaction(txnTimeout)
require.Nil(t, err)
for i := 0; i < 10; i++ {
_, err = producer.Send(context.Background(), &ProducerMessage{
@@ -289,7 +291,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) {
// Acknowledge the rest of the 10 messages with the transaction.
// Expectation: After committing the transaction, all messages of the
subscription will be acknowledged.
_ = txn.Commit(context.Background())
- txn, err = client.NewTransaction(time.Hour)
+ txn, err = client.NewTransaction(txnTimeout)
require.Nil(t, err)
for i := 0; i < 9; i++ {
msg, _ := consumer.Receive(context.Background())
@@ -307,7 +309,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) {
// Create a goroutine to attempt receiving a message and send it to the
'done1' channel.
done2 := make(chan Message)
go func() {
- consumer.Receive(context.Background())
+ _, _ = consumer.Receive(context.Background())
close(done2)
}()
@@ -323,7 +325,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) {
producer.Close()
}
-func TestAckAndSendWithTxn(t *testing.T) {
+func TestTxn_AckAndSend(t *testing.T) {
// Prepare: Create PulsarClient and initialize the transaction
coordinator client.
sourceTopic := newTopicName()
sinkTopic := newTopicName()
@@ -357,7 +359,7 @@ func TestAckAndSendWithTxn(t *testing.T) {
}
// Open a transaction and consume messages from the source topic while
sending messages to the sink topic.
- txn, err := client.NewTransaction(time.Hour)
+ txn, err := client.NewTransaction(txnTimeout)
require.Nil(t, err)
for i := 0; i < 10; i++ {
@@ -393,7 +395,7 @@ func TestAckAndSendWithTxn(t *testing.T) {
sinkProducer.Close()
}
-func TestTransactionAbort(t *testing.T) {
+func TestTxn_TransactionAbort(t *testing.T) {
// Prepare: Create PulsarClient and initialize the transaction
coordinator client.
topic := newTopicName()
sub := "my-sub"
@@ -410,7 +412,7 @@ func TestTransactionAbort(t *testing.T) {
})
// Open a transaction and send 10 messages with the transaction.
- txn, err := client.NewTransaction(time.Hour)
+ txn, err := client.NewTransaction(txnTimeout)
require.Nil(t, err)
for i := 0; i < 10; i++ {
@@ -449,7 +451,7 @@ func consumerShouldNotReceiveMessage(t *testing.T, consumer
Consumer) {
}
}
-func TestTransactionAckChunkMessage(t *testing.T) {
+func TestTxn_AckChunkMessage(t *testing.T) {
topic := newTopicName()
sub := "my-sub"
@@ -457,7 +459,7 @@ func TestTransactionAckChunkMessage(t *testing.T) {
_, client := createTcClient(t)
// Create transaction and register the send operation.
- txn, err := client.NewTransaction(time.Hour)
+ txn, err := client.NewTransaction(txnTimeout)
require.Nil(t, err)
// Create a producer with chunking enabled to send a large message that
will be split into chunks.
@@ -487,13 +489,13 @@ func TestTransactionAckChunkMessage(t *testing.T) {
})
require.NoError(t, err)
_, ok := msgID.(*chunkMessageID)
- require.True(t, ok)
+ require.True(t, ok, fmt.Sprintf("Expected message ID of type
*chunkMessageID, got type %T", msgID))
err = txn.Commit(context.Background())
require.Nil(t, err)
// Receive the message using a new transaction and ack it.
- txn2, err := client.NewTransaction(time.Hour)
+ txn2, err := client.NewTransaction(txnTimeout)
require.Nil(t, err)
message, err := consumer.Receive(context.Background())
require.Nil(t, err)
@@ -501,7 +503,7 @@ func TestTransactionAckChunkMessage(t *testing.T) {
err = consumer.AckWithTxn(message, txn2)
require.Nil(t, err)
- txn2.Abort(context.Background())
+ _ = txn2.Abort(context.Background())
// Close the consumer to simulate reconnection and receive the same
message again.
consumer.Close()
@@ -518,7 +520,7 @@ func TestTransactionAckChunkMessage(t *testing.T) {
require.NotNil(t, message)
// Create a new transaction and ack the message again.
- txn3, err := client.NewTransaction(time.Hour)
+ txn3, err := client.NewTransaction(txnTimeout)
require.Nil(t, err)
err = consumer.AckWithTxn(message, txn3)
@@ -541,7 +543,7 @@ func TestTransactionAckChunkMessage(t *testing.T) {
consumerShouldNotReceiveMessage(t, consumer)
}
-func TestTxnConnReconnect(t *testing.T) {
+func TestTxn_ConnReconnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -593,3 +595,8 @@ func TestTxnConnReconnect(t *testing.T) {
err = txn.Commit(context.Background())
assert.NoError(t, err)
}
+
+func TestTxn_txnStateErrorMessage(t *testing.T) {
+ expected := "Expected transaction state: TxnOpen, actual: TxnTimeout"
+ assert.Equal(t, expected, txnStateErrorMessage(TxnOpen, TxnTimeout))
+}