This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 09dea66  [feat][txn]Implement transactionImpl (#984)
09dea66 is described below

commit 09dea663222505d1c86f8f0ae24f95bc666f11b1
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Mar 24 17:13:04 2023 +0800

    [feat][txn]Implement transactionImpl (#984)
    
    Master Issue:https://github.com/apache/pulsar-client-go/issues/932
    ### Motivation
    Implement transaction coordinator client.
    ### Modifications
    1. Implement transaction coordinator
    2. implement transactionImpl
    3. Implement transaction in producer and consumer API
---
 pulsar/error.go                          |  28 +++-
 pulsar/transaction.go                    |  48 ++++++-
 pulsar/transaction_coordinator_client.go |  35 +++--
 pulsar/transaction_impl.go               | 237 +++++++++++++++++++++++++++++++
 pulsar/transaction_test.go               | 139 ++++++++++++++++--
 5 files changed, 461 insertions(+), 26 deletions(-)

diff --git a/pulsar/error.go b/pulsar/error.go
index 0aa1e3c..73a0b60 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -17,7 +17,11 @@
 
 package pulsar
 
-import "fmt"
+import (
+       "fmt"
+
+       proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
 
 // Result used to represent pulsar processing is an alias of type int.
 type Result int
@@ -103,14 +107,11 @@ const (
        ProducerClosed
        // SchemaFailure means the payload could not be encoded using the Schema
        SchemaFailure
-
-       // ReachMaxPendingOps means the pending operations in transaction_impl 
coordinator reach the maximum.
-       ReachMaxPendingOps
        // InvalidStatus means the component status is not as expected.
        InvalidStatus
-       // TransactionError means this is a transaction related error
-       TransactionError
-
+       // TransactionNoFoundError The transaction is not exist in the 
transaction coordinator, It may be an error txn
+       // or already ended.
+       TransactionNoFoundError
        // ClientMemoryBufferIsFull client limit buffer is full
        ClientMemoryBufferIsFull
 )
@@ -221,7 +222,20 @@ func getResultStr(r Result) string {
                return "SchemaFailure"
        case ClientMemoryBufferIsFull:
                return "ClientMemoryBufferIsFull"
+       case TransactionNoFoundError:
+               return "TransactionNoFoundError"
        default:
                return fmt.Sprintf("Result(%d)", r)
        }
 }
+
+func getErrorFromServerError(serverError *proto.ServerError) error {
+       switch *serverError {
+       case proto.ServerError_TransactionNotFound:
+               return newError(TransactionNoFoundError, serverError.String())
+       case proto.ServerError_InvalidTxnStatus:
+               return newError(InvalidStatus, serverError.String())
+       default:
+               return newError(UnknownError, serverError.String())
+       }
+}
diff --git a/pulsar/transaction.go b/pulsar/transaction.go
index ae7c673..60e1d2b 100644
--- a/pulsar/transaction.go
+++ b/pulsar/transaction.go
@@ -17,7 +17,53 @@
 
 package pulsar
 
+import (
+       "context"
+)
+
+// TxnState The state of the transaction. Check the state of the transaction 
before executing some operation
+// with the transaction is necessary.
+type TxnState int32
+
+const (
+       _ TxnState = iota
+       // TxnOpen The transaction in TxnOpen state can be used to send/ack 
messages.
+       TxnOpen
+       // TxnCommitting The state of the transaction will be TxnCommitting 
after the commit method is called.
+       // The transaction in TxnCommitting state can be committed again.
+       TxnCommitting
+       // TxnAborting The state of the transaction will be TxnAborting after 
the abort method is called.
+       // The transaction in TxnAborting state can be aborted again.
+       TxnAborting
+       // TxnCommitted The state of the transaction will be TxnCommitted after 
the commit method is executed success.
+       // This means that all the operations with the transaction are success.
+       TxnCommitted
+       // TxnAborted The state of the transaction will be TxnAborted after the 
abort method is executed success.
+       // This means that all the operations with the transaction are aborted.
+       TxnAborted
+       // TxnError The state of the transaction will be TxnError after the 
operation of transaction get a non-retryable error.
+       TxnError
+       // TxnTimeout The state of the transaction will be TxnTimeout after the 
transaction timeout.
+       TxnTimeout
+)
+
+// TxnID An identifier for representing a transaction.
 type TxnID struct {
-       mostSigBits  uint64
+       // mostSigBits The most significant 64 bits of this TxnID.
+       mostSigBits uint64
+       // leastSigBits The least significant 64 bits of this TxnID.
        leastSigBits uint64
 }
+
+// Transaction used to guarantee exactly-once
+type Transaction interface {
+       //Commit You can commit the transaction after all the 
sending/acknowledging operations with the transaction success.
+       Commit(context.Context) error
+       //Abort You can abort the transaction when you want to abort all the 
sending/acknowledging operations
+       // with the transaction.
+       Abort(context.Context) error
+       //GetState Get the state of the transaction.
+       GetState() TxnState
+       //GetTxnID Get the identified ID of the transaction.
+       GetTxnID() TxnID
+}
diff --git a/pulsar/transaction_coordinator_client.go 
b/pulsar/transaction_coordinator_client.go
index 82d1490..1535fad 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -114,14 +114,16 @@ func (tc *transactionCoordinatorClient) 
newTransaction(timeout time.Duration) (*
                TxnTtlSeconds: proto.Uint64(uint64(timeout.Milliseconds())),
        }
 
-       cnx, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID], 
requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
+       res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID], 
requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
        tc.semaphore.Release()
        if err != nil {
                return nil, err
+       } else if res.Response.NewTxnResponse.Error != nil {
+               return nil, 
getErrorFromServerError(res.Response.NewTxnResponse.Error)
        }
 
-       return &TxnID{*cnx.Response.NewTxnResponse.TxnidMostBits,
-               *cnx.Response.NewTxnResponse.TxnidLeastBits}, nil
+       return &TxnID{*res.Response.NewTxnResponse.TxnidMostBits,
+               *res.Response.NewTxnResponse.TxnidLeastBits}, nil
 }
 
 // addPublishPartitionToTxn register the partitions which published messages 
with the transactionImpl.
@@ -137,10 +139,15 @@ func (tc *transactionCoordinatorClient) 
addPublishPartitionToTxn(id *TxnID, part
                TxnidLeastBits: proto.Uint64(id.leastSigBits),
                Partitions:     partitions,
        }
-       _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID,
+       res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID,
                pb.BaseCommand_ADD_PARTITION_TO_TXN, cmdAddPartitions)
        tc.semaphore.Release()
-       return err
+       if err != nil {
+               return err
+       } else if res.Response.AddPartitionToTxnResponse.Error != nil {
+               return 
getErrorFromServerError(res.Response.AddPartitionToTxnResponse.Error)
+       }
+       return nil
 }
 
 // addSubscriptionToTxn register the subscription which acked messages with 
the transactionImpl.
@@ -160,10 +167,15 @@ func (tc *transactionCoordinatorClient) 
addSubscriptionToTxn(id *TxnID, topic st
                TxnidLeastBits: proto.Uint64(id.leastSigBits),
                Subscription:   []*pb.Subscription{sub},
        }
-       _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID,
+       res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID,
                pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN, cmdAddSubscription)
        tc.semaphore.Release()
-       return err
+       if err != nil {
+               return err
+       } else if res.Response.AddSubscriptionToTxnResponse.Error != nil {
+               return 
getErrorFromServerError(res.Response.AddSubscriptionToTxnResponse.Error)
+       }
+       return nil
 }
 
 // endTxn commit or abort the transactionImpl.
@@ -178,9 +190,14 @@ func (tc *transactionCoordinatorClient) endTxn(id *TxnID, 
action pb.TxnAction) e
                TxnidMostBits:  proto.Uint64(id.mostSigBits),
                TxnidLeastBits: proto.Uint64(id.leastSigBits),
        }
-       _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
+       res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
        tc.semaphore.Release()
-       return err
+       if err != nil {
+               return err
+       } else if res.Response.EndTxnResponse.Error != nil {
+               return 
getErrorFromServerError(res.Response.EndTxnResponse.Error)
+       }
+       return nil
 }
 
 func getTCAssignTopicName(partition uint64) string {
diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go
new file mode 100644
index 0000000..7cc93ec
--- /dev/null
+++ b/pulsar/transaction_impl.go
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+type subscription struct {
+       topic        string
+       subscription string
+}
+
+type transaction struct {
+       sync.Mutex
+       txnID                    TxnID
+       state                    TxnState
+       tcClient                 *transactionCoordinatorClient
+       registerPartitions       map[string]bool
+       registerAckSubscriptions map[subscription]bool
+       // opsFlow It has two effects:
+       // 1. Wait all the operations of sending and acking messages with the 
transaction complete
+       // by reading msg from the chan.
+       // 2. Prevent sending or acking messages with a committed or aborted 
transaction.
+       // opsCount is record the number of the uncompleted operations.
+       // opsFlow
+       //   Write:
+       //     1. When the transaction is created, a bool will be written to 
opsFlow chan.
+       //     2. When the opsCount decrement from 1 to 0, a new bool will be 
written to opsFlow chan.
+       //     3. When get a retryable error after committing or aborting the 
transaction,
+       //        a bool will be written to opsFlow chan.
+       //   Read:
+       //     1. When the transaction is committed or aborted, a bool will be 
read from opsFlow chan.
+       //     2. When the opsCount increment from 0 to 1, a bool will be read 
from opsFlow chan.
+       opsFlow   chan bool
+       opsCount  int32
+       opTimeout time.Duration
+       log       log.Logger
+}
+
+func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout 
time.Duration) *transaction {
+       transaction := &transaction{
+               txnID:                    id,
+               state:                    TxnOpen,
+               registerPartitions:       make(map[string]bool),
+               registerAckSubscriptions: make(map[subscription]bool),
+               opsFlow:                  make(chan bool, 1),
+               opTimeout:                5 * time.Second,
+               tcClient:                 tcClient,
+       }
+       //This means there are not pending requests with this transaction. The 
transaction can be committed or aborted.
+       transaction.opsFlow <- true
+       go func() {
+               //Set the state of the transaction to timeout after timeout
+               <-time.After(timeout)
+               atomic.CompareAndSwapInt32((*int32)(&transaction.state), 
int32(TxnOpen), int32(TxnTimeout))
+       }()
+       transaction.log = tcClient.log.SubLogger(log.Fields{})
+       return transaction
+}
+
+func (txn *transaction) GetState() TxnState {
+       return txn.state
+}
+
+func (txn *transaction) Commit(ctx context.Context) error {
+       if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), 
int32(TxnCommitting)) ||
+               txn.state == TxnCommitting) {
+               return newError(InvalidStatus, "Expect transaction state is 
TxnOpen but "+txn.state.string())
+       }
+
+       //Wait for all operations to complete
+       select {
+       case <-txn.opsFlow:
+       case <-time.After(txn.opTimeout):
+               return newError(TimeoutError, "There are some operations that 
are not completed after the timeout.")
+       }
+       //Send commit transaction command to transaction coordinator
+       err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT)
+       if err == nil {
+               atomic.StoreInt32((*int32)(&txn.state), int32(TxnCommitted))
+       } else {
+               if err.(*Error).Result() == TransactionNoFoundError || 
err.(*Error).Result() == InvalidStatus {
+                       atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+                       return err
+               }
+               txn.opsFlow <- true
+       }
+       return err
+}
+
+func (txn *transaction) Abort(ctx context.Context) error {
+       if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), 
int32(TxnAborting)) ||
+               txn.state == TxnAborting) {
+               return newError(InvalidStatus, "Expect transaction state is 
TxnOpen but "+txn.state.string())
+       }
+
+       //Wait for all operations to complete
+       select {
+       case <-txn.opsFlow:
+       case <-time.After(txn.opTimeout):
+               return newError(TimeoutError, "There are some operations that 
are not completed after the timeout.")
+       }
+       //Send abort transaction command to transaction coordinator
+       err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT)
+       if err == nil {
+               atomic.StoreInt32((*int32)(&txn.state), int32(TxnAborted))
+       } else {
+               if err.(*Error).Result() == TransactionNoFoundError || 
err.(*Error).Result() == InvalidStatus {
+                       atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+               } else {
+                       txn.opsFlow <- true
+               }
+       }
+       return err
+}
+
+func (txn *transaction) registerSendOrAckOp() error {
+       if atomic.AddInt32(&txn.opsCount, 1) == 1 {
+               //There are new operations that not completed
+               select {
+               case <-txn.opsFlow:
+                       return nil
+               case <-time.After(txn.opTimeout):
+                       if _, err := txn.checkIfOpen(); err != nil {
+                               return err
+                       }
+                       return newError(TimeoutError, "Failed to get the 
semaphore to register the send/ack operation")
+               }
+       }
+       return nil
+}
+
+func (txn *transaction) endSendOrAckOp(err error) {
+       if err != nil {
+               atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
+       }
+       if atomic.AddInt32(&txn.opsCount, -1) == 0 {
+               //This means there are not pending send/ack requests
+               txn.opsFlow <- true
+       }
+}
+
+func (txn *transaction) registerProducerTopic(topic string) error {
+       isOpen, err := txn.checkIfOpen()
+       if !isOpen {
+               return err
+       }
+       _, ok := txn.registerPartitions[topic]
+       if !ok {
+               txn.Lock()
+               defer txn.Unlock()
+               if _, ok = txn.registerPartitions[topic]; !ok {
+                       err := 
txn.tcClient.addPublishPartitionToTxn(&txn.txnID, []string{topic})
+                       if err != nil {
+                               return err
+                       }
+                       txn.registerPartitions[topic] = true
+               }
+       }
+       return nil
+}
+
+func (txn *transaction) registerAckTopic(topic string, subName string) error {
+       isOpen, err := txn.checkIfOpen()
+       if !isOpen {
+               return err
+       }
+       sub := subscription{
+               topic:        topic,
+               subscription: subName,
+       }
+       _, ok := txn.registerAckSubscriptions[sub]
+       if !ok {
+               txn.Lock()
+               defer txn.Unlock()
+               if _, ok = txn.registerAckSubscriptions[sub]; !ok {
+                       err := txn.tcClient.addSubscriptionToTxn(&txn.txnID, 
topic, subName)
+                       if err != nil {
+                               return err
+                       }
+                       txn.registerAckSubscriptions[sub] = true
+               }
+       }
+       return nil
+}
+
+func (txn *transaction) GetTxnID() TxnID {
+       return txn.txnID
+}
+
+func (txn *transaction) checkIfOpen() (bool, error) {
+       if txn.state == TxnOpen {
+               return true, nil
+       }
+       return false, newError(InvalidStatus, "Expect transaction state is 
TxnOpen but "+txn.state.string())
+}
+
+func (state TxnState) string() string {
+       switch state {
+       case TxnOpen:
+               return "TxnOpen"
+       case TxnCommitting:
+               return "TxnCommitting"
+       case TxnAborting:
+               return "TxnAborting"
+       case TxnCommitted:
+               return "TxnCommitted"
+       case TxnAborted:
+               return "TxnAborted"
+       case TxnTimeout:
+               return "TxnTimeout"
+       default:
+               return "Unknown"
+       }
+}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 14a652f..362e4d2 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -18,11 +18,15 @@
 package pulsar
 
 import (
-       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
-       "github.com/stretchr/testify/assert"
-
+       "context"
+       "errors"
+       "fmt"
        "testing"
        "time"
+
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
 func TestTCClient(t *testing.T) {
@@ -79,6 +83,126 @@ func TestTCClient(t *testing.T) {
        defer client.Close()
 }
 
+//Test points:
+//     1. Abort and commit txn.
+//             1. Do nothing, just open a transaction and then commit it or 
abort it.
+//             The operations of committing/aborting txn should success at the 
first time and fail at the second time.
+//     2. The internal API, registerSendOrAckOp and endSendOrAckOp
+//             1. Register 4 operation but only end 3 operations, the 
transaction can not be committed or aborted.
+//             2. Register 3 operation and end 3 operation the transaction can 
be committed and aborted.
+//             3. Register an operation and end the operation with an error,
+//             and then the state of the transaction should be replaced to 
errored.
+//     3. The internal API, registerAckTopic and registerProducerTopic
+//             1. Register ack topic and send topic, and call http request to 
get the stats of the transaction
+//             to do verification.
+
+// TestTxnImplCommitOrAbort Test abort and commit txn
+func TestTxnImplCommitOrAbort(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Open a transaction and then commit it.
+       //The operations of committing txn1 should success at the first time 
and fail at the second time.
+       txn1 := createTxn(tc, t)
+       err := txn1.Commit(context.Background())
+       require.Nil(t, err, fmt.Sprintf("Failed to commit the transaction 
%d:%d\n", txn1.txnID.mostSigBits,
+               txn1.txnID.leastSigBits))
+       txn1.state = TxnOpen
+       txn1.opsFlow <- true
+       err = txn1.Commit(context.Background())
+       assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
+       assert.Equal(t, txn1.GetState(), TxnError)
+       //2. Open a transaction and then abort it.
+       //The operations of aborting txn2 should success at the first time and 
fail at the second time.
+       id2, err := tc.newTransaction(time.Hour)
+       require.Nil(t, err, "Failed to new a transaction")
+       txn2 := newTransaction(*id2, tc, time.Hour)
+       err = txn2.Abort(context.Background())
+       require.Nil(t, err, fmt.Sprintf("Failed to abort the transaction 
%d:%d\n",
+               id2.mostSigBits, id2.leastSigBits))
+       txn2.state = TxnOpen
+       txn2.opsFlow <- true
+       err = txn2.Abort(context.Background())
+       assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
+       assert.Equal(t, txn1.GetState(), TxnError)
+       err = txn2.registerSendOrAckOp()
+       assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+       err = txn1.registerSendOrAckOp()
+       assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+}
+
+// TestRegisterOpAndEndOp Test the internal API including the 
registerSendOrAckOp and endSendOrAckOp.
+func TestRegisterOpAndEndOp(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Register 4 operation but only end 3 operations, the transaction 
can not be committed or aborted.
+       res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
+       assert.Equal(t, res.(*Error).Result(), TimeoutError)
+       res = registerOpAndEndOp(t, tc, 4, 3, nil, false)
+       assert.Equal(t, res.(*Error).Result(), TimeoutError)
+
+       //2. Register 3 operation and end 3 operation the transaction can be 
committed and aborted.
+       res = registerOpAndEndOp(t, tc, 3, 3, nil, true)
+       assert.Nil(t, res)
+       res = registerOpAndEndOp(t, tc, 3, 3, nil, false)
+       assert.Nil(t, res)
+       //3. Register an operation and end the operation with an error,
+       // and then the state of the transaction should be replaced to errored.
+       res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), true)
+       assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+       res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), false)
+       assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+}
+
+// TestRegisterTopic Test the internal API, registerAckTopic and 
registerProducerTopic
+func TestRegisterTopic(t *testing.T) {
+       //1. Prepare: create PulsarClient and init transaction coordinator 
client.
+       topic := newTopicName()
+       sub := "my-sub"
+       tc, client := createTcClient(t)
+       //2. Prepare: create Topic and Subscription.
+       _, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: sub,
+       })
+       assert.NoError(t, err)
+       txn := createTxn(tc, t)
+       //3. Create a topic and register topic and subscription.
+       err = txn.registerAckTopic(topic, sub)
+       require.Nil(t, err, "Failed to register ack topic.")
+       err = txn.registerProducerTopic(topic)
+       require.Nil(t, err, "Failed to register ack topic.")
+       //4. Call http request to get the stats of the transaction to do 
verification.
+       stats2, err := transactionStats(&txn.txnID)
+       assert.NoError(t, err)
+       topics := stats2["producedPartitions"].(map[string]interface{})
+       subTopics := stats2["ackedPartitions"].(map[string]interface{})
+       assert.NotNil(t, topics[topic])
+       assert.NotNil(t, subTopics[topic])
+       subs := subTopics[topic].(map[string]interface{})
+       assert.NotNil(t, subs[sub])
+}
+
+func registerOpAndEndOp(t *testing.T, tc *transactionCoordinatorClient, rp 
int, ep int, err error, commit bool) error {
+       txn := createTxn(tc, t)
+       for i := 0; i < rp; i++ {
+               err := txn.registerSendOrAckOp()
+               assert.Nil(t, err)
+       }
+       for i := 0; i < ep; i++ {
+               txn.endSendOrAckOp(err)
+       }
+       if commit {
+               err = txn.Commit(context.Background())
+       } else {
+               err = txn.Abort(context.Background())
+       }
+       return err
+}
+
+func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
+       id, err := tc.newTransaction(time.Hour)
+       require.Nil(t, err, "Failed to new a transaction.")
+       return newTransaction(*id, tc, time.Hour)
+}
+
 // createTcClient Create a transaction coordinator client to send request
 func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) {
        c, err := NewClient(ClientOptions{
@@ -86,13 +210,10 @@ func createTcClient(t *testing.T) 
(*transactionCoordinatorClient, *client) {
                TLSTrustCertsFilePath: caCertsPath,
                Authentication:        NewAuthenticationTLS(tlsClientCertPath, 
tlsClientKeyPath),
        })
-       if err != nil {
-               t.Fatalf("Failed to create client due to %s", err.Error())
-       }
+       require.Nil(t, err, "Failed to create client.")
        tcClient := newTransactionCoordinatorClientImpl(c.(*client))
-       if err = tcClient.start(); err != nil {
-               t.Fatalf("Failed to start transaction coordinator due to %s", 
err.Error())
-       }
+       err = tcClient.start()
+       require.Nil(t, err, "Failed to start transaction coordinator.")
 
        return tcClient, c.(*client)
 }

Reply via email to