This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 09dea66 [feat][txn]Implement transactionImpl (#984)
09dea66 is described below
commit 09dea663222505d1c86f8f0ae24f95bc666f11b1
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Mar 24 17:13:04 2023 +0800
[feat][txn]Implement transactionImpl (#984)
Master Issue:https://github.com/apache/pulsar-client-go/issues/932
### Motivation
Implement transaction coordinator client.
### Modifications
1. Implement transaction coordinator
2. implement transactionImpl
3. Implement transaction in producer and consumer API
---
pulsar/error.go | 28 +++-
pulsar/transaction.go | 48 ++++++-
pulsar/transaction_coordinator_client.go | 35 +++--
pulsar/transaction_impl.go | 237 +++++++++++++++++++++++++++++++
pulsar/transaction_test.go | 139 ++++++++++++++++--
5 files changed, 461 insertions(+), 26 deletions(-)
diff --git a/pulsar/error.go b/pulsar/error.go
index 0aa1e3c..73a0b60 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -17,7 +17,11 @@
package pulsar
-import "fmt"
+import (
+ "fmt"
+
+ proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
// Result used to represent pulsar processing is an alias of type int.
type Result int
@@ -103,14 +107,11 @@ const (
ProducerClosed
// SchemaFailure means the payload could not be encoded using the Schema
SchemaFailure
-
- // ReachMaxPendingOps means the pending operations in transaction_impl
coordinator reach the maximum.
- ReachMaxPendingOps
// InvalidStatus means the component status is not as expected.
InvalidStatus
- // TransactionError means this is a transaction related error
- TransactionError
-
+ // TransactionNoFoundError The transaction is not exist in the
transaction coordinator, It may be an error txn
+ // or already ended.
+ TransactionNoFoundError
// ClientMemoryBufferIsFull client limit buffer is full
ClientMemoryBufferIsFull
)
@@ -221,7 +222,20 @@ func getResultStr(r Result) string {
return "SchemaFailure"
case ClientMemoryBufferIsFull:
return "ClientMemoryBufferIsFull"
+ case TransactionNoFoundError:
+ return "TransactionNoFoundError"
default:
return fmt.Sprintf("Result(%d)", r)
}
}
+
+func getErrorFromServerError(serverError *proto.ServerError) error {
+ switch *serverError {
+ case proto.ServerError_TransactionNotFound:
+ return newError(TransactionNoFoundError, serverError.String())
+ case proto.ServerError_InvalidTxnStatus:
+ return newError(InvalidStatus, serverError.String())
+ default:
+ return newError(UnknownError, serverError.String())
+ }
+}
diff --git a/pulsar/transaction.go b/pulsar/transaction.go
index ae7c673..60e1d2b 100644
--- a/pulsar/transaction.go
+++ b/pulsar/transaction.go
@@ -17,7 +17,53 @@
package pulsar
+import (
+ "context"
+)
+
+// TxnState The state of the transaction. Check the state of the transaction
before executing some operation
+// with the transaction is necessary.
+type TxnState int32
+
+const (
+ _ TxnState = iota
+ // TxnOpen The transaction in TxnOpen state can be used to send/ack
messages.
+ TxnOpen
+ // TxnCommitting The state of the transaction will be TxnCommitting
after the commit method is called.
+ // The transaction in TxnCommitting state can be committed again.
+ TxnCommitting
+ // TxnAborting The state of the transaction will be TxnAborting after
the abort method is called.
+ // The transaction in TxnAborting state can be aborted again.
+ TxnAborting
+ // TxnCommitted The state of the transaction will be TxnCommitted after
the commit method is executed success.
+ // This means that all the operations with the transaction are success.
+ TxnCommitted
+ // TxnAborted The state of the transaction will be TxnAborted after the
abort method is executed success.
+ // This means that all the operations with the transaction are aborted.
+ TxnAborted
+ // TxnError The state of the transaction will be TxnError after the
operation of transaction get a non-retryable error.
+ TxnError
+ // TxnTimeout The state of the transaction will be TxnTimeout after the
transaction timeout.
+ TxnTimeout
+)
+
+// TxnID An identifier for representing a transaction.
type TxnID struct {
- mostSigBits uint64
+ // mostSigBits The most significant 64 bits of this TxnID.
+ mostSigBits uint64
+ // leastSigBits The least significant 64 bits of this TxnID.
leastSigBits uint64
}
+
+// Transaction used to guarantee exactly-once
+type Transaction interface {
+ //Commit You can commit the transaction after all the
sending/acknowledging operations with the transaction success.
+ Commit(context.Context) error
+ //Abort You can abort the transaction when you want to abort all the
sending/acknowledging operations
+ // with the transaction.
+ Abort(context.Context) error
+ //GetState Get the state of the transaction.
+ GetState() TxnState
+ //GetTxnID Get the identified ID of the transaction.
+ GetTxnID() TxnID
+}
diff --git a/pulsar/transaction_coordinator_client.go
b/pulsar/transaction_coordinator_client.go
index 82d1490..1535fad 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -114,14 +114,16 @@ func (tc *transactionCoordinatorClient)
newTransaction(timeout time.Duration) (*
TxnTtlSeconds: proto.Uint64(uint64(timeout.Milliseconds())),
}
- cnx, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID],
requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
+ res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID],
requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
tc.semaphore.Release()
if err != nil {
return nil, err
+ } else if res.Response.NewTxnResponse.Error != nil {
+ return nil,
getErrorFromServerError(res.Response.NewTxnResponse.Error)
}
- return &TxnID{*cnx.Response.NewTxnResponse.TxnidMostBits,
- *cnx.Response.NewTxnResponse.TxnidLeastBits}, nil
+ return &TxnID{*res.Response.NewTxnResponse.TxnidMostBits,
+ *res.Response.NewTxnResponse.TxnidLeastBits}, nil
}
// addPublishPartitionToTxn register the partitions which published messages
with the transactionImpl.
@@ -137,10 +139,15 @@ func (tc *transactionCoordinatorClient)
addPublishPartitionToTxn(id *TxnID, part
TxnidLeastBits: proto.Uint64(id.leastSigBits),
Partitions: partitions,
}
- _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID,
+ res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID,
pb.BaseCommand_ADD_PARTITION_TO_TXN, cmdAddPartitions)
tc.semaphore.Release()
- return err
+ if err != nil {
+ return err
+ } else if res.Response.AddPartitionToTxnResponse.Error != nil {
+ return
getErrorFromServerError(res.Response.AddPartitionToTxnResponse.Error)
+ }
+ return nil
}
// addSubscriptionToTxn register the subscription which acked messages with
the transactionImpl.
@@ -160,10 +167,15 @@ func (tc *transactionCoordinatorClient)
addSubscriptionToTxn(id *TxnID, topic st
TxnidLeastBits: proto.Uint64(id.leastSigBits),
Subscription: []*pb.Subscription{sub},
}
- _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID,
+ res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID,
pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN, cmdAddSubscription)
tc.semaphore.Release()
- return err
+ if err != nil {
+ return err
+ } else if res.Response.AddSubscriptionToTxnResponse.Error != nil {
+ return
getErrorFromServerError(res.Response.AddSubscriptionToTxnResponse.Error)
+ }
+ return nil
}
// endTxn commit or abort the transactionImpl.
@@ -178,9 +190,14 @@ func (tc *transactionCoordinatorClient) endTxn(id *TxnID,
action pb.TxnAction) e
TxnidMostBits: proto.Uint64(id.mostSigBits),
TxnidLeastBits: proto.Uint64(id.leastSigBits),
}
- _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
+ res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
tc.semaphore.Release()
- return err
+ if err != nil {
+ return err
+ } else if res.Response.EndTxnResponse.Error != nil {
+ return
getErrorFromServerError(res.Response.EndTxnResponse.Error)
+ }
+ return nil
}
func getTCAssignTopicName(partition uint64) string {
diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go
new file mode 100644
index 0000000..7cc93ec
--- /dev/null
+++ b/pulsar/transaction_impl.go
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+type subscription struct {
+ topic string
+ subscription string
+}
+
+type transaction struct {
+ sync.Mutex
+ txnID TxnID
+ state TxnState
+ tcClient *transactionCoordinatorClient
+ registerPartitions map[string]bool
+ registerAckSubscriptions map[subscription]bool
+ // opsFlow It has two effects:
+ // 1. Wait all the operations of sending and acking messages with the
transaction complete
+ // by reading msg from the chan.
+ // 2. Prevent sending or acking messages with a committed or aborted
transaction.
+ // opsCount is record the number of the uncompleted operations.
+ // opsFlow
+ // Write:
+ // 1. When the transaction is created, a bool will be written to
opsFlow chan.
+ // 2. When the opsCount decrement from 1 to 0, a new bool will be
written to opsFlow chan.
+ // 3. When get a retryable error after committing or aborting the
transaction,
+ // a bool will be written to opsFlow chan.
+ // Read:
+ // 1. When the transaction is committed or aborted, a bool will be
read from opsFlow chan.
+ // 2. When the opsCount increment from 0 to 1, a bool will be read
from opsFlow chan.
+ opsFlow chan bool
+ opsCount int32
+ opTimeout time.Duration
+ log log.Logger
+}
+
+func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout
time.Duration) *transaction {
+ transaction := &transaction{
+ txnID: id,
+ state: TxnOpen,
+ registerPartitions: make(map[string]bool),
+ registerAckSubscriptions: make(map[subscription]bool),
+ opsFlow: make(chan bool, 1),
+ opTimeout: 5 * time.Second,
+ tcClient: tcClient,
+ }
+ //This means there are not pending requests with this transaction. The
transaction can be committed or aborted.
+ transaction.opsFlow <- true
+ go func() {
+ //Set the state of the transaction to timeout after timeout
+ <-time.After(timeout)
+ atomic.CompareAndSwapInt32((*int32)(&transaction.state),
int32(TxnOpen), int32(TxnTimeout))
+ }()
+ transaction.log = tcClient.log.SubLogger(log.Fields{})
+ return transaction
+}
+
+func (txn *transaction) GetState() TxnState {
+ return txn.state
+}
+
+func (txn *transaction) Commit(ctx context.Context) error {
+ if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen),
int32(TxnCommitting)) ||
+ txn.state == TxnCommitting) {
+ return newError(InvalidStatus, "Expect transaction state is
TxnOpen but "+txn.state.string())
+ }
+
+ //Wait for all operations to complete
+ select {
+ case <-txn.opsFlow:
+ case <-time.After(txn.opTimeout):
+ return newError(TimeoutError, "There are some operations that
are not completed after the timeout.")
+ }
+ //Send commit transaction command to transaction coordinator
+ err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT)
+ if err == nil {
+ atomic.StoreInt32((*int32)(&txn.state), int32(TxnCommitted))
+ } else {
+ if err.(*Error).Result() == TransactionNoFoundError ||
err.(*Error).Result() == InvalidStatus {
+ atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+ return err
+ }
+ txn.opsFlow <- true
+ }
+ return err
+}
+
+func (txn *transaction) Abort(ctx context.Context) error {
+ if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen),
int32(TxnAborting)) ||
+ txn.state == TxnAborting) {
+ return newError(InvalidStatus, "Expect transaction state is
TxnOpen but "+txn.state.string())
+ }
+
+ //Wait for all operations to complete
+ select {
+ case <-txn.opsFlow:
+ case <-time.After(txn.opTimeout):
+ return newError(TimeoutError, "There are some operations that
are not completed after the timeout.")
+ }
+ //Send abort transaction command to transaction coordinator
+ err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT)
+ if err == nil {
+ atomic.StoreInt32((*int32)(&txn.state), int32(TxnAborted))
+ } else {
+ if err.(*Error).Result() == TransactionNoFoundError ||
err.(*Error).Result() == InvalidStatus {
+ atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+ } else {
+ txn.opsFlow <- true
+ }
+ }
+ return err
+}
+
+func (txn *transaction) registerSendOrAckOp() error {
+ if atomic.AddInt32(&txn.opsCount, 1) == 1 {
+ //There are new operations that not completed
+ select {
+ case <-txn.opsFlow:
+ return nil
+ case <-time.After(txn.opTimeout):
+ if _, err := txn.checkIfOpen(); err != nil {
+ return err
+ }
+ return newError(TimeoutError, "Failed to get the
semaphore to register the send/ack operation")
+ }
+ }
+ return nil
+}
+
+func (txn *transaction) endSendOrAckOp(err error) {
+ if err != nil {
+ atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+ }
+ if atomic.AddInt32(&txn.opsCount, -1) == 0 {
+ //This means there are not pending send/ack requests
+ txn.opsFlow <- true
+ }
+}
+
+func (txn *transaction) registerProducerTopic(topic string) error {
+ isOpen, err := txn.checkIfOpen()
+ if !isOpen {
+ return err
+ }
+ _, ok := txn.registerPartitions[topic]
+ if !ok {
+ txn.Lock()
+ defer txn.Unlock()
+ if _, ok = txn.registerPartitions[topic]; !ok {
+ err :=
txn.tcClient.addPublishPartitionToTxn(&txn.txnID, []string{topic})
+ if err != nil {
+ return err
+ }
+ txn.registerPartitions[topic] = true
+ }
+ }
+ return nil
+}
+
+func (txn *transaction) registerAckTopic(topic string, subName string) error {
+ isOpen, err := txn.checkIfOpen()
+ if !isOpen {
+ return err
+ }
+ sub := subscription{
+ topic: topic,
+ subscription: subName,
+ }
+ _, ok := txn.registerAckSubscriptions[sub]
+ if !ok {
+ txn.Lock()
+ defer txn.Unlock()
+ if _, ok = txn.registerAckSubscriptions[sub]; !ok {
+ err := txn.tcClient.addSubscriptionToTxn(&txn.txnID,
topic, subName)
+ if err != nil {
+ return err
+ }
+ txn.registerAckSubscriptions[sub] = true
+ }
+ }
+ return nil
+}
+
+func (txn *transaction) GetTxnID() TxnID {
+ return txn.txnID
+}
+
+func (txn *transaction) checkIfOpen() (bool, error) {
+ if txn.state == TxnOpen {
+ return true, nil
+ }
+ return false, newError(InvalidStatus, "Expect transaction state is
TxnOpen but "+txn.state.string())
+}
+
+func (state TxnState) string() string {
+ switch state {
+ case TxnOpen:
+ return "TxnOpen"
+ case TxnCommitting:
+ return "TxnCommitting"
+ case TxnAborting:
+ return "TxnAborting"
+ case TxnCommitted:
+ return "TxnCommitted"
+ case TxnAborted:
+ return "TxnAborted"
+ case TxnTimeout:
+ return "TxnTimeout"
+ default:
+ return "Unknown"
+ }
+}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 14a652f..362e4d2 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -18,11 +18,15 @@
package pulsar
import (
- pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
- "github.com/stretchr/testify/assert"
-
+ "context"
+ "errors"
+ "fmt"
"testing"
"time"
+
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestTCClient(t *testing.T) {
@@ -79,6 +83,126 @@ func TestTCClient(t *testing.T) {
defer client.Close()
}
+//Test points:
+// 1. Abort and commit txn.
+// 1. Do nothing, just open a transaction and then commit it or
abort it.
+// The operations of committing/aborting txn should success at the
first time and fail at the second time.
+// 2. The internal API, registerSendOrAckOp and endSendOrAckOp
+// 1. Register 4 operation but only end 3 operations, the
transaction can not be committed or aborted.
+// 2. Register 3 operation and end 3 operation the transaction can
be committed and aborted.
+// 3. Register an operation and end the operation with an error,
+// and then the state of the transaction should be replaced to
errored.
+// 3. The internal API, registerAckTopic and registerProducerTopic
+// 1. Register ack topic and send topic, and call http request to
get the stats of the transaction
+// to do verification.
+
+// TestTxnImplCommitOrAbort Test abort and commit txn
+func TestTxnImplCommitOrAbort(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Open a transaction and then commit it.
+ //The operations of committing txn1 should success at the first time
and fail at the second time.
+ txn1 := createTxn(tc, t)
+ err := txn1.Commit(context.Background())
+ require.Nil(t, err, fmt.Sprintf("Failed to commit the transaction
%d:%d\n", txn1.txnID.mostSigBits,
+ txn1.txnID.leastSigBits))
+ txn1.state = TxnOpen
+ txn1.opsFlow <- true
+ err = txn1.Commit(context.Background())
+ assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
+ assert.Equal(t, txn1.GetState(), TxnError)
+ //2. Open a transaction and then abort it.
+ //The operations of aborting txn2 should success at the first time and
fail at the second time.
+ id2, err := tc.newTransaction(time.Hour)
+ require.Nil(t, err, "Failed to new a transaction")
+ txn2 := newTransaction(*id2, tc, time.Hour)
+ err = txn2.Abort(context.Background())
+ require.Nil(t, err, fmt.Sprintf("Failed to abort the transaction
%d:%d\n",
+ id2.mostSigBits, id2.leastSigBits))
+ txn2.state = TxnOpen
+ txn2.opsFlow <- true
+ err = txn2.Abort(context.Background())
+ assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
+ assert.Equal(t, txn1.GetState(), TxnError)
+ err = txn2.registerSendOrAckOp()
+ assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+ err = txn1.registerSendOrAckOp()
+ assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+}
+
+// TestRegisterOpAndEndOp Test the internal API including the
registerSendOrAckOp and endSendOrAckOp.
+func TestRegisterOpAndEndOp(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Register 4 operation but only end 3 operations, the transaction
can not be committed or aborted.
+ res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
+ assert.Equal(t, res.(*Error).Result(), TimeoutError)
+ res = registerOpAndEndOp(t, tc, 4, 3, nil, false)
+ assert.Equal(t, res.(*Error).Result(), TimeoutError)
+
+ //2. Register 3 operation and end 3 operation the transaction can be
committed and aborted.
+ res = registerOpAndEndOp(t, tc, 3, 3, nil, true)
+ assert.Nil(t, res)
+ res = registerOpAndEndOp(t, tc, 3, 3, nil, false)
+ assert.Nil(t, res)
+ //3. Register an operation and end the operation with an error,
+ // and then the state of the transaction should be replaced to errored.
+ res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), true)
+ assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+ res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), false)
+ assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+}
+
+// TestRegisterTopic Test the internal API, registerAckTopic and
registerProducerTopic
+func TestRegisterTopic(t *testing.T) {
+ //1. Prepare: create PulsarClient and init transaction coordinator
client.
+ topic := newTopicName()
+ sub := "my-sub"
+ tc, client := createTcClient(t)
+ //2. Prepare: create Topic and Subscription.
+ _, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ })
+ assert.NoError(t, err)
+ txn := createTxn(tc, t)
+ //3. Create a topic and register topic and subscription.
+ err = txn.registerAckTopic(topic, sub)
+ require.Nil(t, err, "Failed to register ack topic.")
+ err = txn.registerProducerTopic(topic)
+ require.Nil(t, err, "Failed to register ack topic.")
+ //4. Call http request to get the stats of the transaction to do
verification.
+ stats2, err := transactionStats(&txn.txnID)
+ assert.NoError(t, err)
+ topics := stats2["producedPartitions"].(map[string]interface{})
+ subTopics := stats2["ackedPartitions"].(map[string]interface{})
+ assert.NotNil(t, topics[topic])
+ assert.NotNil(t, subTopics[topic])
+ subs := subTopics[topic].(map[string]interface{})
+ assert.NotNil(t, subs[sub])
+}
+
+func registerOpAndEndOp(t *testing.T, tc *transactionCoordinatorClient, rp
int, ep int, err error, commit bool) error {
+ txn := createTxn(tc, t)
+ for i := 0; i < rp; i++ {
+ err := txn.registerSendOrAckOp()
+ assert.Nil(t, err)
+ }
+ for i := 0; i < ep; i++ {
+ txn.endSendOrAckOp(err)
+ }
+ if commit {
+ err = txn.Commit(context.Background())
+ } else {
+ err = txn.Abort(context.Background())
+ }
+ return err
+}
+
+func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
+ id, err := tc.newTransaction(time.Hour)
+ require.Nil(t, err, "Failed to new a transaction.")
+ return newTransaction(*id, tc, time.Hour)
+}
+
// createTcClient Create a transaction coordinator client to send request
func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) {
c, err := NewClient(ClientOptions{
@@ -86,13 +210,10 @@ func createTcClient(t *testing.T)
(*transactionCoordinatorClient, *client) {
TLSTrustCertsFilePath: caCertsPath,
Authentication: NewAuthenticationTLS(tlsClientCertPath,
tlsClientKeyPath),
})
- if err != nil {
- t.Fatalf("Failed to create client due to %s", err.Error())
- }
+ require.Nil(t, err, "Failed to create client.")
tcClient := newTransactionCoordinatorClientImpl(c.(*client))
- if err = tcClient.start(); err != nil {
- t.Fatalf("Failed to start transaction coordinator due to %s",
err.Error())
- }
+ err = tcClient.start()
+ require.Nil(t, err, "Failed to start transaction coordinator.")
return tcClient, c.(*client)
}