This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch xiangying/txn/transaction_impl in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 5a21c240da3cf54e92c8b6bd99a0d7b2d15c2930 Author: xiangying <[email protected]> AuthorDate: Tue Mar 7 21:45:40 2023 +0800 [feat][txn]Implement transactionImpl --- pulsar/error.go | 25 +++- pulsar/transaction.go | 25 ++++ pulsar/transaction_coordinator_client.go | 35 +++-- pulsar/transaction_impl.go | 226 +++++++++++++++++++++++++++++++ pulsar/transaction_test.go | 141 ++++++++++++++++++- 5 files changed, 435 insertions(+), 17 deletions(-) diff --git a/pulsar/error.go b/pulsar/error.go index 0aa1e3c..0d4a631 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -17,7 +17,10 @@ package pulsar -import "fmt" +import ( + "fmt" + proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" +) // Result used to represent pulsar processing is an alias of type int. type Result int @@ -103,14 +106,11 @@ const ( ProducerClosed // SchemaFailure means the payload could not be encoded using the Schema SchemaFailure - - // ReachMaxPendingOps means the pending operations in transaction_impl coordinator reach the maximum. - ReachMaxPendingOps // InvalidStatus means the component status is not as expected. InvalidStatus - // TransactionError means this is a transaction related error - TransactionError - + // TransactionNoFoundError The transaction is not exist in the transaction coordinator, It may be an error txn + // or already ended. + TransactionNoFoundError // ClientMemoryBufferIsFull client limit buffer is full ClientMemoryBufferIsFull ) @@ -225,3 +225,14 @@ func getResultStr(r Result) string { return fmt.Sprintf("Result(%d)", r) } } + +func getErrorFromServerError(serverError *proto.ServerError) error { + switch *serverError { + case proto.ServerError_TransactionNotFound: + return newError(TransactionNoFoundError, serverError.String()) + case proto.ServerError_InvalidTxnStatus: + return newError(InvalidStatus, serverError.String()) + default: + return newError(UnknownError, serverError.String()) + } +} diff --git a/pulsar/transaction.go b/pulsar/transaction.go index ae7c673..adee2d0 100644 --- a/pulsar/transaction.go +++ b/pulsar/transaction.go @@ -17,7 +17,32 @@ package pulsar +import ( + "context" +) + +const ( + Open = iota //The init state is open + Committing + Aborting + Committed + Aborted + Errored + TimeOut +) + type TxnID struct { mostSigBits uint64 leastSigBits uint64 } + +// Transaction used to guarantee exactly-once +type Transaction interface { + Commit(context.Context) error + + Abort(context.Context) error + + GetState() State + + GetTxnID() TxnID +} diff --git a/pulsar/transaction_coordinator_client.go b/pulsar/transaction_coordinator_client.go index 82d1490..1535fad 100644 --- a/pulsar/transaction_coordinator_client.go +++ b/pulsar/transaction_coordinator_client.go @@ -114,14 +114,16 @@ func (tc *transactionCoordinatorClient) newTransaction(timeout time.Duration) (* TxnTtlSeconds: proto.Uint64(uint64(timeout.Milliseconds())), } - cnx, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID], requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn) + res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID], requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn) tc.semaphore.Release() if err != nil { return nil, err + } else if res.Response.NewTxnResponse.Error != nil { + return nil, getErrorFromServerError(res.Response.NewTxnResponse.Error) } - return &TxnID{*cnx.Response.NewTxnResponse.TxnidMostBits, - *cnx.Response.NewTxnResponse.TxnidLeastBits}, nil + return &TxnID{*res.Response.NewTxnResponse.TxnidMostBits, + *res.Response.NewTxnResponse.TxnidLeastBits}, nil } // addPublishPartitionToTxn register the partitions which published messages with the transactionImpl. @@ -137,10 +139,15 @@ func (tc *transactionCoordinatorClient) addPublishPartitionToTxn(id *TxnID, part TxnidLeastBits: proto.Uint64(id.leastSigBits), Partitions: partitions, } - _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, + res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, pb.BaseCommand_ADD_PARTITION_TO_TXN, cmdAddPartitions) tc.semaphore.Release() - return err + if err != nil { + return err + } else if res.Response.AddPartitionToTxnResponse.Error != nil { + return getErrorFromServerError(res.Response.AddPartitionToTxnResponse.Error) + } + return nil } // addSubscriptionToTxn register the subscription which acked messages with the transactionImpl. @@ -160,10 +167,15 @@ func (tc *transactionCoordinatorClient) addSubscriptionToTxn(id *TxnID, topic st TxnidLeastBits: proto.Uint64(id.leastSigBits), Subscription: []*pb.Subscription{sub}, } - _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, + res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN, cmdAddSubscription) tc.semaphore.Release() - return err + if err != nil { + return err + } else if res.Response.AddSubscriptionToTxnResponse.Error != nil { + return getErrorFromServerError(res.Response.AddSubscriptionToTxnResponse.Error) + } + return nil } // endTxn commit or abort the transactionImpl. @@ -178,9 +190,14 @@ func (tc *transactionCoordinatorClient) endTxn(id *TxnID, action pb.TxnAction) e TxnidMostBits: proto.Uint64(id.mostSigBits), TxnidLeastBits: proto.Uint64(id.leastSigBits), } - _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, pb.BaseCommand_END_TXN, cmdEndTxn) + res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, pb.BaseCommand_END_TXN, cmdEndTxn) tc.semaphore.Release() - return err + if err != nil { + return err + } else if res.Response.EndTxnResponse.Error != nil { + return getErrorFromServerError(res.Response.EndTxnResponse.Error) + } + return nil } func getTCAssignTopicName(partition uint64) string { diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go new file mode 100644 index 0000000..69cd745 --- /dev/null +++ b/pulsar/transaction_impl.go @@ -0,0 +1,226 @@ +package pulsar + +import ( + "context" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/apache/pulsar-client-go/pulsar/log" + uAtomic "go.uber.org/atomic" + "sync" + "sync/atomic" + "time" +) + +type State int32 +type void struct{} +type subscription struct { + topic string + subscription string +} + +type transaction struct { + sync.Mutex + txnID TxnID + state State + tcClient *transactionCoordinatorClient + registerPartitions map[string]void + registerAckSubscriptions map[subscription]void + /** + * opsFlow Wait all the operations of sending and acking messages with the transaction complete + * by reading msg from the chan. + * opsCount is record the number of the uncompleted operations. + * opsFlow + * Write: + * 1. When the transaction is created, a new empty struct{} will be written to opsFlow chan. + * 2. When the opsCount decrement from 1 to 0, a new empty struct{} will be written to opsFlow chan. + * 3. When get a retryable error after committing or aborting the transaction, + * a new empty struct{} will be written to opsFlow chan. + * Read: + * 1. When the transaction is committed or aborted, an empty struct{} will be read from opsFlow chan. + * 2. When the opsCount increment from 0 to 1, an empty struct{} will be read from opsFlow chan. + */ + opsFlow chan struct{} + opsCount uAtomic.Int32 + log log.Logger +} + +func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout time.Duration) *transaction { + transaction := &transaction{ + txnID: id, + state: Open, + registerPartitions: make(map[string]void), + registerAckSubscriptions: make(map[subscription]void), + opsFlow: make(chan struct{}, 1), + tcClient: tcClient, + } + //This means there are not pending requests with this transaction. The transaction can be committed or aborted. + transaction.opsFlow <- struct{}{} + go func() { + //Set the state of the transaction to timeout after timeout + <-time.After(timeout) + atomic.CompareAndSwapInt32((*int32)(&transaction.state), Open, TimeOut) + }() + transaction.log = tcClient.log.SubLogger(log.Fields{}) + return transaction +} + +func (txn *transaction) GetState() State { + return txn.state +} + +func (txn *transaction) Commit(ctx context.Context) error { + if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), Open, Committing) || txn.state == Committing) { + return newError(InvalidStatus, "Expect transaction state is Open but "+txn.state.string()) + } + + //Wait for all operations to complete + <-txn.opsFlow + //Send commit transaction command to transaction coordinator + err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT) + if err == nil { + atomic.StoreInt32((*int32)(&txn.state), Committed) + } else { + if err.(*Error).Result() == TransactionNoFoundError || err.(*Error).Result() == InvalidStatus { + atomic.StoreInt32((*int32)(&txn.state), Errored) + return err + } else { + txn.opsFlow <- struct{}{} + } + } + return err +} + +func (txn *transaction) Abort(ctx context.Context) error { + if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), Open, Aborting) || txn.state == Aborting) { + return newError(InvalidStatus, "Expect transaction state is Open but "+txn.state.string()) + } + + //Wait for all operations to complete + <-txn.opsFlow + //Send abort transaction command to transaction coordinator + err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT) + if err == nil { + atomic.StoreInt32((*int32)(&txn.state), Aborted) + } else { + if err.(*Error).Result() == TransactionNoFoundError || err.(*Error).Result() == InvalidStatus { + atomic.StoreInt32((*int32)(&txn.state), Errored) + } else { + txn.opsFlow <- struct{}{} + } + } + return err +} + +func (txn *transaction) registerSendOrAckOp() { + if txn.opsCount.Inc() == 1 { + //There are new operations that not completed + <-txn.opsFlow + } +} + +func (txn *transaction) endSendOrAckOp(err error) { + if err != nil { + atomic.StoreInt32((*int32)(&txn.state), Errored) + } + if txn.opsCount.Dec() == 0 { + //This means there are not pending send/ack requests + txn.opsFlow <- struct{}{} + } +} + +func (txn *transaction) registerProducedTopicAsync(topic string, callback func(err error)) { + go func() { + err := txn.registerProducerTopic(topic) + callback(err) + }() +} + +func (txn *transaction) registerAckTopicAsync(topic string, subName string, + callback func(err error)) { + go func() { + err := txn.registerAckTopic(topic, subName) + callback(err) + }() +} + +func (txn *transaction) registerProducerTopic(topic string) error { + isOpen, err := txn.checkIfOpen() + if !isOpen { + return err + } + _, ok := txn.registerPartitions[topic] + if !ok { + txn.Lock() + defer txn.Unlock() + err := txn.tcClient.addPublishPartitionToTxn(&txn.txnID, []string{topic}) + if err != nil { + return err + } + txn.registerPartitions[topic] = struct{}{} + return nil + } else { + return nil + } +} + +func (txn *transaction) registerAckTopic(topic string, subName string) error { + isOpen, err := txn.checkIfOpen() + if !isOpen { + return err + } + sub := subscription{ + topic: topic, + subscription: subName, + } + _, ok := txn.registerAckSubscriptions[sub] + if !ok { + txn.Lock() + defer txn.Unlock() + err := txn.tcClient.addSubscriptionToTxn(&txn.txnID, topic, subName) + if err != nil { + return err + } + txn.registerAckSubscriptions[sub] = struct{}{} + return nil + } else { + return nil + } +} + +func (txn *transaction) GetTxnID() TxnID { + return txn.txnID +} + +func (txn *transaction) checkIfOpen() (bool, error) { + if txn.state == Open { + return true, nil + } else { + return false, newError(InvalidStatus, "Expect transaction state is Open but "+txn.state.string()) + } +} + +func (txn *transaction) checkIfOpenOrAborting() (bool, error) { + if atomic.CompareAndSwapInt32((*int32)(&txn.state), Open, Aborting) || txn.state == Aborted { + return true, nil + } else { + return false, newError(InvalidStatus, "Expect transaction state is Open but "+txn.state.string()) + } +} + +func (state State) string() string { + switch state { + case Open: + return "Open" + case Committing: + return "Committing" + case Aborting: + return "Aborting" + case Committed: + return "Committed" + case Aborted: + return "Aborted" + case TimeOut: + return "TimeOut" + default: + return "Unknown" + } +} diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go index 14a652f..d989887 100644 --- a/pulsar/transaction_test.go +++ b/pulsar/transaction_test.go @@ -18,9 +18,10 @@ package pulsar import ( + "context" + "errors" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/stretchr/testify/assert" - "testing" "time" ) @@ -79,6 +80,144 @@ func TestTCClient(t *testing.T) { defer client.Close() } +/** +* Test points: +* 1. Abort and commit txn. +* 1. Do nothing, just open a transaction and then commit it or abort it. +* The operations of committing/aborting txn should success at the first time and fail at the second time. +* 2. The internal API, registerSendOrAckOp and endSendOrAckOp +* 1. Register 4 operation but only end 3 operations, the transaction can not be committed or aborted. +* 2. Register 4 operation and end 4 operation the transaction can be committed and aborted. +* 3. Register an operation and end the operation with an error, +* and then the state of the transaction should be replaced to errored. +* 3. The internal API, registerAckTopic and registerProducerTopic +* 1. Register ack topic and send topic, and call http request to get the stats of the transaction +* to do verification. + */ + +//1. Test abort and commit txn. +func TestTxnImplCommitOrAbort(t *testing.T) { + tc, _ := createTcClient(t) + //1. Open a transaction and then commit it. + //The operations of committing txn1 should success at the first time and fail at the second time. + txn1 := createTxn(tc, t) + err := txn1.Commit(context.Background()) + if err != nil { + t.Fatalf("Failed to commit the transaction %d:%d, %s\n", txn1.txnID.mostSigBits, txn1.txnID.leastSigBits, + err.Error()) + } + txn1.state = Open + txn1.opsFlow <- struct{}{} + err = txn1.Commit(context.Background()) + assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError) + assert.Equal(t, txn1.GetState(), State(Errored)) + //2. Open a transaction and then abort it. + //The operations of aborting txn2 should success at the first time and fail at the second time. + id2, err := tc.newTransaction(time.Hour) + if err != nil { + t.Fatalf("Failed to new a transaction %s", err.Error()) + } + txn2 := newTransaction(*id2, tc, time.Hour) + err = txn2.Abort(context.Background()) + if err != nil { + t.Fatalf("Failed to abort the transaction %d:%d, %s\n", id2.mostSigBits, id2.leastSigBits, err.Error()) + } + txn2.state = Open + txn2.opsFlow <- struct{}{} + err = txn2.Abort(context.Background()) + assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError) + assert.Equal(t, txn1.GetState(), State(Errored)) +} + +//2. Test the internal API, registerSendOrAckOp and endSendOrAckOp +func TestRegisterOpAndEndOp(t *testing.T) { + tc, _ := createTcClient(t) + //1. Register 4 operation but only end 3 operations, the transaction can not be committed or aborted. + txn3 := createTxn(tc, t) + res := registerOpAndEndOp(txn3, 4, 3, nil) + select { + case <-res: + t.Fatalf("The transaction %d:%d should not be commited or aborted", txn3.txnID.mostSigBits, + txn3.txnID.leastSigBits) + case <-time.After(3 * time.Second): + } + //2. Register 4 operation and end 4 operation the transaction can be committed and aborted. + txn4 := createTxn(tc, t) + res = registerOpAndEndOp(txn4, 4, 4, nil) + select { + case <-res: + case <-time.After(3 * time.Second): + t.Fatalf("The transaction %d:%d should be commited or aborted", txn4.txnID.mostSigBits, + txn4.txnID.leastSigBits) + } + //3. Register an operation and end the operation with an error, + // and then the state of the transaction should be replaced to errored. + txn5 := createTxn(tc, t) + registerOpAndEndOp(txn5, 4, 4, errors.New("")) + assert.Equal(t, txn5.GetState(), State(Errored)) +} + +//3. Test the internal API, registerAckTopic and registerProducerTopic +func TestRegisterTopic(t *testing.T) { + //1. Prepare: create PulsarClient and init transaction coordinator client. + topic := newTopicName() + sub := "my-sub" + tc, client := createTcClient(t) + //2. Prepare: create Topic and Subscription. + _, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: sub, + }) + assert.NoError(t, err) + txn := createTxn(tc, t) + //3. Create a topic and register topic and subscription. + err = txn.registerAckTopic(topic, sub) + if err != nil { + t.Fatalf("Failed to register ack topic %s", err.Error()) + } + err = txn.registerProducerTopic(topic) + if err != nil { + t.Fatalf("Failed to register ack topic %s", err.Error()) + } + //4. Call http request to get the stats of the transaction to do verification. + stats2, err := transactionStats(&txn.txnID) + assert.NoError(t, err) + topics := stats2["producedPartitions"].(map[string]interface{}) + subTopics := stats2["ackedPartitions"].(map[string]interface{}) + assert.NotNil(t, topics[topic]) + assert.NotNil(t, subTopics[topic]) + subs := subTopics[topic].(map[string]interface{}) + assert.NotNil(t, subs[sub]) +} + +func registerOpAndEndOp(txn *transaction, rp int, ep int, err error) <-chan struct{} { + for i := 0; i < rp; i++ { + txn.registerSendOrAckOp() + } + for i := 0; i < ep; i++ { + txn.endSendOrAckOp(err) + } + + res := make(chan struct{}) + go func() { + txn.Commit(context.Background()) + res <- struct{}{} + }() + go func() { + txn.Abort(context.Background()) + res <- struct{}{} + }() + return res +} + +func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction { + id3, err := tc.newTransaction(time.Hour) + if err != nil { + t.Fatalf("Failed to new a transaction %s", err.Error()) + } + return newTransaction(*id3, tc, time.Hour) +} + // createTcClient Create a transaction coordinator client to send request func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) { c, err := NewClient(ClientOptions{
