This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch xiangying/txn/transaction_impl
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 5a21c240da3cf54e92c8b6bd99a0d7b2d15c2930
Author: xiangying <[email protected]>
AuthorDate: Tue Mar 7 21:45:40 2023 +0800

    [feat][txn]Implement transactionImpl
---
 pulsar/error.go                          |  25 +++-
 pulsar/transaction.go                    |  25 ++++
 pulsar/transaction_coordinator_client.go |  35 +++--
 pulsar/transaction_impl.go               | 226 +++++++++++++++++++++++++++++++
 pulsar/transaction_test.go               | 141 ++++++++++++++++++-
 5 files changed, 435 insertions(+), 17 deletions(-)

diff --git a/pulsar/error.go b/pulsar/error.go
index 0aa1e3c..0d4a631 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -17,7 +17,10 @@
 
 package pulsar
 
-import "fmt"
+import (
+       "fmt"
+       proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
 
 // Result used to represent pulsar processing is an alias of type int.
 type Result int
@@ -103,14 +106,11 @@ const (
        ProducerClosed
        // SchemaFailure means the payload could not be encoded using the Schema
        SchemaFailure
-
-       // ReachMaxPendingOps means the pending operations in transaction_impl 
coordinator reach the maximum.
-       ReachMaxPendingOps
        // InvalidStatus means the component status is not as expected.
        InvalidStatus
-       // TransactionError means this is a transaction related error
-       TransactionError
-
+       // TransactionNoFoundError The transaction is not exist in the 
transaction coordinator, It may be an error txn
+       // or already ended.
+       TransactionNoFoundError
        // ClientMemoryBufferIsFull client limit buffer is full
        ClientMemoryBufferIsFull
 )
@@ -225,3 +225,14 @@ func getResultStr(r Result) string {
                return fmt.Sprintf("Result(%d)", r)
        }
 }
+
+func getErrorFromServerError(serverError *proto.ServerError) error {
+       switch *serverError {
+       case proto.ServerError_TransactionNotFound:
+               return newError(TransactionNoFoundError, serverError.String())
+       case proto.ServerError_InvalidTxnStatus:
+               return newError(InvalidStatus, serverError.String())
+       default:
+               return newError(UnknownError, serverError.String())
+       }
+}
diff --git a/pulsar/transaction.go b/pulsar/transaction.go
index ae7c673..adee2d0 100644
--- a/pulsar/transaction.go
+++ b/pulsar/transaction.go
@@ -17,7 +17,32 @@
 
 package pulsar
 
+import (
+       "context"
+)
+
+const (
+       Open = iota //The init state is open
+       Committing
+       Aborting
+       Committed
+       Aborted
+       Errored
+       TimeOut
+)
+
 type TxnID struct {
        mostSigBits  uint64
        leastSigBits uint64
 }
+
+// Transaction used to guarantee exactly-once
+type Transaction interface {
+       Commit(context.Context) error
+
+       Abort(context.Context) error
+
+       GetState() State
+
+       GetTxnID() TxnID
+}
diff --git a/pulsar/transaction_coordinator_client.go 
b/pulsar/transaction_coordinator_client.go
index 82d1490..1535fad 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -114,14 +114,16 @@ func (tc *transactionCoordinatorClient) 
newTransaction(timeout time.Duration) (*
                TxnTtlSeconds: proto.Uint64(uint64(timeout.Milliseconds())),
        }
 
-       cnx, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID], 
requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
+       res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID], 
requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
        tc.semaphore.Release()
        if err != nil {
                return nil, err
+       } else if res.Response.NewTxnResponse.Error != nil {
+               return nil, 
getErrorFromServerError(res.Response.NewTxnResponse.Error)
        }
 
-       return &TxnID{*cnx.Response.NewTxnResponse.TxnidMostBits,
-               *cnx.Response.NewTxnResponse.TxnidLeastBits}, nil
+       return &TxnID{*res.Response.NewTxnResponse.TxnidMostBits,
+               *res.Response.NewTxnResponse.TxnidLeastBits}, nil
 }
 
 // addPublishPartitionToTxn register the partitions which published messages 
with the transactionImpl.
@@ -137,10 +139,15 @@ func (tc *transactionCoordinatorClient) 
addPublishPartitionToTxn(id *TxnID, part
                TxnidLeastBits: proto.Uint64(id.leastSigBits),
                Partitions:     partitions,
        }
-       _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID,
+       res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID,
                pb.BaseCommand_ADD_PARTITION_TO_TXN, cmdAddPartitions)
        tc.semaphore.Release()
-       return err
+       if err != nil {
+               return err
+       } else if res.Response.AddPartitionToTxnResponse.Error != nil {
+               return 
getErrorFromServerError(res.Response.AddPartitionToTxnResponse.Error)
+       }
+       return nil
 }
 
 // addSubscriptionToTxn register the subscription which acked messages with 
the transactionImpl.
@@ -160,10 +167,15 @@ func (tc *transactionCoordinatorClient) 
addSubscriptionToTxn(id *TxnID, topic st
                TxnidLeastBits: proto.Uint64(id.leastSigBits),
                Subscription:   []*pb.Subscription{sub},
        }
-       _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID,
+       res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID,
                pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN, cmdAddSubscription)
        tc.semaphore.Release()
-       return err
+       if err != nil {
+               return err
+       } else if res.Response.AddSubscriptionToTxnResponse.Error != nil {
+               return 
getErrorFromServerError(res.Response.AddSubscriptionToTxnResponse.Error)
+       }
+       return nil
 }
 
 // endTxn commit or abort the transactionImpl.
@@ -178,9 +190,14 @@ func (tc *transactionCoordinatorClient) endTxn(id *TxnID, 
action pb.TxnAction) e
                TxnidMostBits:  proto.Uint64(id.mostSigBits),
                TxnidLeastBits: proto.Uint64(id.leastSigBits),
        }
-       _, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
+       res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], 
requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
        tc.semaphore.Release()
-       return err
+       if err != nil {
+               return err
+       } else if res.Response.EndTxnResponse.Error != nil {
+               return 
getErrorFromServerError(res.Response.EndTxnResponse.Error)
+       }
+       return nil
 }
 
 func getTCAssignTopicName(partition uint64) string {
diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go
new file mode 100644
index 0000000..69cd745
--- /dev/null
+++ b/pulsar/transaction_impl.go
@@ -0,0 +1,226 @@
+package pulsar
+
+import (
+       "context"
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
+       uAtomic "go.uber.org/atomic"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+type State int32
+type void struct{}
+type subscription struct {
+       topic        string
+       subscription string
+}
+
+type transaction struct {
+       sync.Mutex
+       txnID                    TxnID
+       state                    State
+       tcClient                 *transactionCoordinatorClient
+       registerPartitions       map[string]void
+       registerAckSubscriptions map[subscription]void
+       /**
+       * opsFlow Wait all the operations of sending and acking messages with 
the transaction complete
+       * by reading msg from the chan.
+       * opsCount is record the number of the uncompleted operations.
+       * opsFlow
+       *        Write:
+       *                     1. When the transaction is created, a new empty 
struct{} will be written to opsFlow chan.
+       *                         2. When the opsCount decrement from 1 to 0, a 
new empty struct{} will be written to opsFlow chan.
+       *                         3. When get a retryable error after 
committing or aborting the transaction,
+       *                    a new empty struct{} will be written to opsFlow 
chan.
+       *        Read:
+       *                         1. When the transaction is committed or 
aborted, an empty struct{} will be read from opsFlow chan.
+       *                         2. When the opsCount increment from 0 to 1, 
an empty struct{} will be read from opsFlow chan.
+        */
+       opsFlow  chan struct{}
+       opsCount uAtomic.Int32
+       log      log.Logger
+}
+
+func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout 
time.Duration) *transaction {
+       transaction := &transaction{
+               txnID:                    id,
+               state:                    Open,
+               registerPartitions:       make(map[string]void),
+               registerAckSubscriptions: make(map[subscription]void),
+               opsFlow:                  make(chan struct{}, 1),
+               tcClient:                 tcClient,
+       }
+       //This means there are not pending requests with this transaction. The 
transaction can be committed or aborted.
+       transaction.opsFlow <- struct{}{}
+       go func() {
+               //Set the state of the transaction to timeout after timeout
+               <-time.After(timeout)
+               atomic.CompareAndSwapInt32((*int32)(&transaction.state), Open, 
TimeOut)
+       }()
+       transaction.log = tcClient.log.SubLogger(log.Fields{})
+       return transaction
+}
+
+func (txn *transaction) GetState() State {
+       return txn.state
+}
+
+func (txn *transaction) Commit(ctx context.Context) error {
+       if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), Open, Committing) 
|| txn.state == Committing) {
+               return newError(InvalidStatus, "Expect transaction state is 
Open but "+txn.state.string())
+       }
+
+       //Wait for all operations to complete
+       <-txn.opsFlow
+       //Send commit transaction command to transaction coordinator
+       err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT)
+       if err == nil {
+               atomic.StoreInt32((*int32)(&txn.state), Committed)
+       } else {
+               if err.(*Error).Result() == TransactionNoFoundError || 
err.(*Error).Result() == InvalidStatus {
+                       atomic.StoreInt32((*int32)(&txn.state), Errored)
+                       return err
+               } else {
+                       txn.opsFlow <- struct{}{}
+               }
+       }
+       return err
+}
+
+func (txn *transaction) Abort(ctx context.Context) error {
+       if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), Open, Aborting) 
|| txn.state == Aborting) {
+               return newError(InvalidStatus, "Expect transaction state is 
Open but "+txn.state.string())
+       }
+
+       //Wait for all operations to complete
+       <-txn.opsFlow
+       //Send abort transaction command to transaction coordinator
+       err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT)
+       if err == nil {
+               atomic.StoreInt32((*int32)(&txn.state), Aborted)
+       } else {
+               if err.(*Error).Result() == TransactionNoFoundError || 
err.(*Error).Result() == InvalidStatus {
+                       atomic.StoreInt32((*int32)(&txn.state), Errored)
+               } else {
+                       txn.opsFlow <- struct{}{}
+               }
+       }
+       return err
+}
+
+func (txn *transaction) registerSendOrAckOp() {
+       if txn.opsCount.Inc() == 1 {
+               //There are new operations that not completed
+               <-txn.opsFlow
+       }
+}
+
+func (txn *transaction) endSendOrAckOp(err error) {
+       if err != nil {
+               atomic.StoreInt32((*int32)(&txn.state), Errored)
+       }
+       if txn.opsCount.Dec() == 0 {
+               //This means there are not pending send/ack requests
+               txn.opsFlow <- struct{}{}
+       }
+}
+
+func (txn *transaction) registerProducedTopicAsync(topic string, callback 
func(err error)) {
+       go func() {
+               err := txn.registerProducerTopic(topic)
+               callback(err)
+       }()
+}
+
+func (txn *transaction) registerAckTopicAsync(topic string, subName string,
+       callback func(err error)) {
+       go func() {
+               err := txn.registerAckTopic(topic, subName)
+               callback(err)
+       }()
+}
+
+func (txn *transaction) registerProducerTopic(topic string) error {
+       isOpen, err := txn.checkIfOpen()
+       if !isOpen {
+               return err
+       }
+       _, ok := txn.registerPartitions[topic]
+       if !ok {
+               txn.Lock()
+               defer txn.Unlock()
+               err := txn.tcClient.addPublishPartitionToTxn(&txn.txnID, 
[]string{topic})
+               if err != nil {
+                       return err
+               }
+               txn.registerPartitions[topic] = struct{}{}
+               return nil
+       } else {
+               return nil
+       }
+}
+
+func (txn *transaction) registerAckTopic(topic string, subName string) error {
+       isOpen, err := txn.checkIfOpen()
+       if !isOpen {
+               return err
+       }
+       sub := subscription{
+               topic:        topic,
+               subscription: subName,
+       }
+       _, ok := txn.registerAckSubscriptions[sub]
+       if !ok {
+               txn.Lock()
+               defer txn.Unlock()
+               err := txn.tcClient.addSubscriptionToTxn(&txn.txnID, topic, 
subName)
+               if err != nil {
+                       return err
+               }
+               txn.registerAckSubscriptions[sub] = struct{}{}
+               return nil
+       } else {
+               return nil
+       }
+}
+
+func (txn *transaction) GetTxnID() TxnID {
+       return txn.txnID
+}
+
+func (txn *transaction) checkIfOpen() (bool, error) {
+       if txn.state == Open {
+               return true, nil
+       } else {
+               return false, newError(InvalidStatus, "Expect transaction state 
is Open but "+txn.state.string())
+       }
+}
+
+func (txn *transaction) checkIfOpenOrAborting() (bool, error) {
+       if atomic.CompareAndSwapInt32((*int32)(&txn.state), Open, Aborting) || 
txn.state == Aborted {
+               return true, nil
+       } else {
+               return false, newError(InvalidStatus, "Expect transaction state 
is Open but "+txn.state.string())
+       }
+}
+
+func (state State) string() string {
+       switch state {
+       case Open:
+               return "Open"
+       case Committing:
+               return "Committing"
+       case Aborting:
+               return "Aborting"
+       case Committed:
+               return "Committed"
+       case Aborted:
+               return "Aborted"
+       case TimeOut:
+               return "TimeOut"
+       default:
+               return "Unknown"
+       }
+}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 14a652f..d989887 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -18,9 +18,10 @@
 package pulsar
 
 import (
+       "context"
+       "errors"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/stretchr/testify/assert"
-
        "testing"
        "time"
 )
@@ -79,6 +80,144 @@ func TestTCClient(t *testing.T) {
        defer client.Close()
 }
 
+/**
+* Test points:
+*              1. Abort and commit txn.
+*                      1. Do nothing, just open a transaction and then commit 
it or abort it.
+*                      The operations of committing/aborting txn should 
success at the first time and fail at the second time.
+*       2. The internal API, registerSendOrAckOp and endSendOrAckOp
+*                      1. Register 4 operation but only end 3 operations, the 
transaction can not be committed or aborted.
+*                      2. Register 4 operation and end 4 operation the 
transaction can be committed and aborted.
+*                      3. Register an operation and end the operation with an 
error,
+*                      and then the state of the transaction should be 
replaced to errored.
+*              3. The internal API, registerAckTopic and registerProducerTopic
+*                      1. Register ack topic and send topic, and call http 
request to get the stats of the transaction
+*              to do verification.
+ */
+
+//1. Test abort and commit txn.
+func TestTxnImplCommitOrAbort(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Open a transaction and then commit it.
+       //The operations of committing txn1 should success at the first time 
and fail at the second time.
+       txn1 := createTxn(tc, t)
+       err := txn1.Commit(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to commit the transaction %d:%d, %s\n", 
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+                       err.Error())
+       }
+       txn1.state = Open
+       txn1.opsFlow <- struct{}{}
+       err = txn1.Commit(context.Background())
+       assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
+       assert.Equal(t, txn1.GetState(), State(Errored))
+       //2. Open a transaction and then abort it.
+       //The operations of aborting txn2 should success at the first time and 
fail at the second time.
+       id2, err := tc.newTransaction(time.Hour)
+       if err != nil {
+               t.Fatalf("Failed to new a transaction %s", err.Error())
+       }
+       txn2 := newTransaction(*id2, tc, time.Hour)
+       err = txn2.Abort(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to abort the transaction %d:%d, %s\n", 
id2.mostSigBits, id2.leastSigBits, err.Error())
+       }
+       txn2.state = Open
+       txn2.opsFlow <- struct{}{}
+       err = txn2.Abort(context.Background())
+       assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
+       assert.Equal(t, txn1.GetState(), State(Errored))
+}
+
+//2. Test the internal API, registerSendOrAckOp and endSendOrAckOp
+func TestRegisterOpAndEndOp(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Register 4 operation but only end 3 operations, the transaction 
can not be committed or aborted.
+       txn3 := createTxn(tc, t)
+       res := registerOpAndEndOp(txn3, 4, 3, nil)
+       select {
+       case <-res:
+               t.Fatalf("The transaction %d:%d should not be commited or 
aborted", txn3.txnID.mostSigBits,
+                       txn3.txnID.leastSigBits)
+       case <-time.After(3 * time.Second):
+       }
+       //2. Register 4 operation and end 4 operation the transaction can be 
committed and aborted.
+       txn4 := createTxn(tc, t)
+       res = registerOpAndEndOp(txn4, 4, 4, nil)
+       select {
+       case <-res:
+       case <-time.After(3 * time.Second):
+               t.Fatalf("The transaction %d:%d should be commited or aborted", 
txn4.txnID.mostSigBits,
+                       txn4.txnID.leastSigBits)
+       }
+       //3. Register an operation and end the operation with an error,
+       // and then the state of the transaction should be replaced to errored.
+       txn5 := createTxn(tc, t)
+       registerOpAndEndOp(txn5, 4, 4, errors.New(""))
+       assert.Equal(t, txn5.GetState(), State(Errored))
+}
+
+//3. Test the internal API, registerAckTopic and registerProducerTopic
+func TestRegisterTopic(t *testing.T) {
+       //1. Prepare: create PulsarClient and init transaction coordinator 
client.
+       topic := newTopicName()
+       sub := "my-sub"
+       tc, client := createTcClient(t)
+       //2. Prepare: create Topic and Subscription.
+       _, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: sub,
+       })
+       assert.NoError(t, err)
+       txn := createTxn(tc, t)
+       //3. Create a topic and register topic and subscription.
+       err = txn.registerAckTopic(topic, sub)
+       if err != nil {
+               t.Fatalf("Failed to register ack topic %s", err.Error())
+       }
+       err = txn.registerProducerTopic(topic)
+       if err != nil {
+               t.Fatalf("Failed to register ack topic %s", err.Error())
+       }
+       //4. Call http request to get the stats of the transaction to do 
verification.
+       stats2, err := transactionStats(&txn.txnID)
+       assert.NoError(t, err)
+       topics := stats2["producedPartitions"].(map[string]interface{})
+       subTopics := stats2["ackedPartitions"].(map[string]interface{})
+       assert.NotNil(t, topics[topic])
+       assert.NotNil(t, subTopics[topic])
+       subs := subTopics[topic].(map[string]interface{})
+       assert.NotNil(t, subs[sub])
+}
+
+func registerOpAndEndOp(txn *transaction, rp int, ep int, err error) <-chan 
struct{} {
+       for i := 0; i < rp; i++ {
+               txn.registerSendOrAckOp()
+       }
+       for i := 0; i < ep; i++ {
+               txn.endSendOrAckOp(err)
+       }
+
+       res := make(chan struct{})
+       go func() {
+               txn.Commit(context.Background())
+               res <- struct{}{}
+       }()
+       go func() {
+               txn.Abort(context.Background())
+               res <- struct{}{}
+       }()
+       return res
+}
+
+func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
+       id3, err := tc.newTransaction(time.Hour)
+       if err != nil {
+               t.Fatalf("Failed to new a transaction %s", err.Error())
+       }
+       return newTransaction(*id3, tc, time.Hour)
+}
+
 // createTcClient Create a transaction coordinator client to send request
 func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) {
        c, err := NewClient(ClientOptions{

Reply via email to