BewareMyPower commented on code in PR #984:
URL: https://github.com/apache/pulsar-client-go/pull/984#discussion_r1146129062


##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
        defer client.Close()
 }
 
+/**
+* Test points:
+*              1. Abort and commit txn.
+*                      1. Do nothing, just open a transaction and then commit 
it or abort it.
+*                      The operations of committing/aborting txn should 
success at the first time and fail at the second time.
+*       2. The internal API, registerSendOrAckOp and endSendOrAckOp
+*                      1. Register 4 operation but only end 3 operations, the 
transaction can not be committed or aborted.
+*                      2. Register 3 operation and end 3 operation the 
transaction can be committed and aborted.
+*                      3. Register an operation and end the operation with an 
error,
+*                      and then the state of the transaction should be 
replaced to errored.
+*              3. The internal API, registerAckTopic and registerProducerTopic
+*                      1. Register ack topic and send topic, and call http 
request to get the stats of the transaction
+*              to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Open a transaction and then commit it.
+       //The operations of committing txn1 should success at the first time 
and fail at the second time.
+       txn1 := createTxn(tc, t)
+       err := txn1.Commit(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to commit the transaction %d:%d, %s\n", 
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+                       err.Error())
+       }
+       txn1.state = Open
+       txn1.opsFlow <- true
+       err = txn1.Commit(context.Background())
+       assert.Equal(t, err.(*Error).Result(), 
Result(pb.ServerError_TransactionNotFound))
+       assert.Equal(t, txn1.GetState(), Errored)
+       //2. Open a transaction and then abort it.
+       //The operations of aborting txn2 should success at the first time and 
fail at the second time.
+       id2, err := tc.newTransaction(time.Hour)
+       if err != nil {
+               t.Fatalf("Failed to new a transaction %s", err.Error())
+       }
+       txn2 := newTransaction(*id2, tc, time.Hour)
+       err = txn2.Abort(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to abort the transaction %d:%d, %s\n", 
id2.mostSigBits, id2.leastSigBits, err.Error())
+       }
+       txn2.state = Open
+       txn2.opsFlow <- true
+       err = txn2.Abort(context.Background())
+       assert.Equal(t, err.(*Error).Result(), 
Result(pb.ServerError_TransactionNotFound))
+       assert.Equal(t, txn1.GetState(), Errored)
+       err = txn2.registerSendOrAckOp()
+       assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+       err = txn1.registerSendOrAckOp()
+       assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+}
+
+/** TestRegisterOpAndEndOp Test the internal API including the 
registerSendOrAckOp and endSendOrAckOp. */
+func TestRegisterOpAndEndOp(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Register 4 operation but only end 3 operations, the transaction 
can not be committed or aborted.
+       res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
+       assert.Equal(t, res.(*Error).Result(), TimeoutError)
+       res = registerOpAndEndOp(t, tc, 4, 3, nil, false)
+       assert.Equal(t, res.(*Error).Result(), TimeoutError)
+
+       //2. Register 3 operation and end 3 operation the transaction can be 
committed and aborted.
+       res = registerOpAndEndOp(t, tc, 3, 3, nil, true)
+       assert.Nil(t, res)
+       res = registerOpAndEndOp(t, tc, 3, 3, nil, false)
+       assert.Nil(t, res)
+       //3. Register an operation and end the operation with an error,
+       // and then the state of the transaction should be replaced to errored.
+       res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), true)
+       assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+       res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), false)
+       assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+}
+
+/** TestRegisterTopic Test the internal API, registerAckTopic and 
registerProducerTopic */
+func TestRegisterTopic(t *testing.T) {
+       //1. Prepare: create PulsarClient and init transaction coordinator 
client.
+       topic := newTopicName()
+       sub := "my-sub"
+       tc, client := createTcClient(t)
+       //2. Prepare: create Topic and Subscription.
+       _, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: sub,
+       })
+       assert.NoError(t, err)
+       txn := createTxn(tc, t)
+       //3. Create a topic and register topic and subscription.
+       err = txn.registerAckTopic(topic, sub)
+       if err != nil {
+               t.Fatalf("Failed to register ack topic %s", err.Error())
+       }
+       err = txn.registerProducerTopic(topic)
+       if err != nil {
+               t.Fatalf("Failed to register ack topic %s", err.Error())
+       }
+       //4. Call http request to get the stats of the transaction to do 
verification.
+       stats2, err := transactionStats(&txn.txnID)
+       assert.NoError(t, err)
+       topics := stats2["producedPartitions"].(map[string]interface{})
+       subTopics := stats2["ackedPartitions"].(map[string]interface{})
+       assert.NotNil(t, topics[topic])
+       assert.NotNil(t, subTopics[topic])
+       subs := subTopics[topic].(map[string]interface{})
+       assert.NotNil(t, subs[sub])
+}
+
+func registerOpAndEndOp(t *testing.T, tc *transactionCoordinatorClient, rp 
int, ep int, err error, commit bool) error {
+       txn := createTxn(tc, t)
+       for i := 0; i < rp; i++ {
+               err := txn.registerSendOrAckOp()
+               assert.Nil(t, err)
+       }
+       for i := 0; i < ep; i++ {
+               txn.endSendOrAckOp(err)
+       }
+       if commit {
+               err = txn.Commit(context.Background())
+       } else {
+               err = txn.Abort(context.Background())
+       }
+       return err
+}
+
+func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
+       id3, err := tc.newTransaction(time.Hour)

Review Comment:
   Please don't use a number suffix, I think we can just use `id` here.



##########
pulsar/transaction.go:
##########
@@ -17,7 +17,53 @@
 
 package pulsar
 
+import (
+       "context"
+)
+
+// TxnState The state of the transaction. Check the state of the transaction 
before executing some operation
+// with the transaction is necessary.
+type TxnState int32
+
+const (
+       _ TxnState = iota
+       // Open The transaction in Open state can be used to send/ack messages.
+       Open
+       // Committing The state of the transaction will be Committing after the 
commit method is called.
+       // The transaction in Committing state can be committed again.
+       Committing
+       // Aborting The state of the transaction will be Aborting after the 
abort method is called.
+       // The transaction in Aborting state can be aborted again.
+       Aborting
+       // Committed The state of the transaction will be Committed after the 
commit method is executed success.
+       // This means that all the operations with the transaction are success.
+       Committed
+       // Aborted The state of the transaction will be Aborted after the abort 
method is executed success.
+       // This means that all the operations with the transaction are aborted.
+       Aborted
+       // Errored The state of the transaction will be Errored after the 
operation of transaction get a non-retryable error.
+       Errored

Review Comment:
   I see the error code is `ERROR` in the Java client. Should we name it with 
`Error` here? `Errored` looks confusing. We use the `ed` and `ing` suffixes to 
differ whether a step is complete or ongoing.



##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
        defer client.Close()
 }
 
+/**
+* Test points:
+*              1. Abort and commit txn.
+*                      1. Do nothing, just open a transaction and then commit 
it or abort it.
+*                      The operations of committing/aborting txn should 
success at the first time and fail at the second time.
+*       2. The internal API, registerSendOrAckOp and endSendOrAckOp
+*                      1. Register 4 operation but only end 3 operations, the 
transaction can not be committed or aborted.
+*                      2. Register 3 operation and end 3 operation the 
transaction can be committed and aborted.
+*                      3. Register an operation and end the operation with an 
error,
+*                      and then the state of the transaction should be 
replaced to errored.
+*              3. The internal API, registerAckTopic and registerProducerTopic
+*                      1. Register ack topic and send topic, and call http 
request to get the stats of the transaction
+*              to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Open a transaction and then commit it.
+       //The operations of committing txn1 should success at the first time 
and fail at the second time.
+       txn1 := createTxn(tc, t)
+       err := txn1.Commit(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to commit the transaction %d:%d, %s\n", 
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+                       err.Error())
+       }

Review Comment:
   ```suggestion
           assert.Nil(t, err, "Failed to commit the transaction %d:%d, %s\n",
                   txn1.txnID.mostSigBits, txn1.txnID.leastSigBits, err.Error())
   ```



##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
        defer client.Close()
 }
 
+/**
+* Test points:
+*              1. Abort and commit txn.
+*                      1. Do nothing, just open a transaction and then commit 
it or abort it.
+*                      The operations of committing/aborting txn should 
success at the first time and fail at the second time.
+*       2. The internal API, registerSendOrAckOp and endSendOrAckOp
+*                      1. Register 4 operation but only end 3 operations, the 
transaction can not be committed or aborted.
+*                      2. Register 3 operation and end 3 operation the 
transaction can be committed and aborted.
+*                      3. Register an operation and end the operation with an 
error,
+*                      and then the state of the transaction should be 
replaced to errored.
+*              3. The internal API, registerAckTopic and registerProducerTopic
+*                      1. Register ack topic and send topic, and call http 
request to get the stats of the transaction
+*              to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Open a transaction and then commit it.
+       //The operations of committing txn1 should success at the first time 
and fail at the second time.
+       txn1 := createTxn(tc, t)
+       err := txn1.Commit(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to commit the transaction %d:%d, %s\n", 
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+                       err.Error())
+       }
+       txn1.state = Open
+       txn1.opsFlow <- true
+       err = txn1.Commit(context.Background())
+       assert.Equal(t, err.(*Error).Result(), 
Result(pb.ServerError_TransactionNotFound))
+       assert.Equal(t, txn1.GetState(), Errored)
+       //2. Open a transaction and then abort it.
+       //The operations of aborting txn2 should success at the first time and 
fail at the second time.
+       id2, err := tc.newTransaction(time.Hour)
+       if err != nil {
+               t.Fatalf("Failed to new a transaction %s", err.Error())
+       }
+       txn2 := newTransaction(*id2, tc, time.Hour)
+       err = txn2.Abort(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to abort the transaction %d:%d, %s\n", 
id2.mostSigBits, id2.leastSigBits, err.Error())
+       }

Review Comment:
   ```suggestion
           assert.Nil(t, err, "Failed to abort the transaction %d:%d, %s\n",
                   id2.mostSigBits, id2.leastSigBits, err.Error())
   ```



##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
        defer client.Close()
 }
 
+/**
+* Test points:
+*              1. Abort and commit txn.
+*                      1. Do nothing, just open a transaction and then commit 
it or abort it.
+*                      The operations of committing/aborting txn should 
success at the first time and fail at the second time.
+*       2. The internal API, registerSendOrAckOp and endSendOrAckOp
+*                      1. Register 4 operation but only end 3 operations, the 
transaction can not be committed or aborted.
+*                      2. Register 3 operation and end 3 operation the 
transaction can be committed and aborted.
+*                      3. Register an operation and end the operation with an 
error,
+*                      and then the state of the transaction should be 
replaced to errored.
+*              3. The internal API, registerAckTopic and registerProducerTopic
+*                      1. Register ack topic and send topic, and call http 
request to get the stats of the transaction
+*              to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Open a transaction and then commit it.
+       //The operations of committing txn1 should success at the first time 
and fail at the second time.
+       txn1 := createTxn(tc, t)
+       err := txn1.Commit(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to commit the transaction %d:%d, %s\n", 
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+                       err.Error())
+       }
+       txn1.state = Open
+       txn1.opsFlow <- true
+       err = txn1.Commit(context.Background())
+       assert.Equal(t, err.(*Error).Result(), 
Result(pb.ServerError_TransactionNotFound))
+       assert.Equal(t, txn1.GetState(), Errored)
+       //2. Open a transaction and then abort it.
+       //The operations of aborting txn2 should success at the first time and 
fail at the second time.
+       id2, err := tc.newTransaction(time.Hour)
+       if err != nil {
+               t.Fatalf("Failed to new a transaction %s", err.Error())
+       }
+       txn2 := newTransaction(*id2, tc, time.Hour)
+       err = txn2.Abort(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to abort the transaction %d:%d, %s\n", 
id2.mostSigBits, id2.leastSigBits, err.Error())
+       }
+       txn2.state = Open
+       txn2.opsFlow <- true
+       err = txn2.Abort(context.Background())
+       assert.Equal(t, err.(*Error).Result(), 
Result(pb.ServerError_TransactionNotFound))
+       assert.Equal(t, txn1.GetState(), Errored)
+       err = txn2.registerSendOrAckOp()
+       assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+       err = txn1.registerSendOrAckOp()
+       assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+}
+
+/** TestRegisterOpAndEndOp Test the internal API including the 
registerSendOrAckOp and endSendOrAckOp. */
+func TestRegisterOpAndEndOp(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Register 4 operation but only end 3 operations, the transaction 
can not be committed or aborted.
+       res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
+       assert.Equal(t, res.(*Error).Result(), TimeoutError)
+       res = registerOpAndEndOp(t, tc, 4, 3, nil, false)
+       assert.Equal(t, res.(*Error).Result(), TimeoutError)
+
+       //2. Register 3 operation and end 3 operation the transaction can be 
committed and aborted.
+       res = registerOpAndEndOp(t, tc, 3, 3, nil, true)
+       assert.Nil(t, res)
+       res = registerOpAndEndOp(t, tc, 3, 3, nil, false)
+       assert.Nil(t, res)
+       //3. Register an operation and end the operation with an error,
+       // and then the state of the transaction should be replaced to errored.
+       res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), true)
+       assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+       res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), false)
+       assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+}
+
+/** TestRegisterTopic Test the internal API, registerAckTopic and 
registerProducerTopic */
+func TestRegisterTopic(t *testing.T) {
+       //1. Prepare: create PulsarClient and init transaction coordinator 
client.
+       topic := newTopicName()
+       sub := "my-sub"
+       tc, client := createTcClient(t)
+       //2. Prepare: create Topic and Subscription.
+       _, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: sub,
+       })
+       assert.NoError(t, err)
+       txn := createTxn(tc, t)
+       //3. Create a topic and register topic and subscription.
+       err = txn.registerAckTopic(topic, sub)
+       if err != nil {
+               t.Fatalf("Failed to register ack topic %s", err.Error())
+       }
+       err = txn.registerProducerTopic(topic)
+       if err != nil {
+               t.Fatalf("Failed to register ack topic %s", err.Error())
+       }

Review Comment:
   Please also replace the check with the `assert.Nil` call.



##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
        defer client.Close()
 }
 
+/**
+* Test points:
+*              1. Abort and commit txn.
+*                      1. Do nothing, just open a transaction and then commit 
it or abort it.
+*                      The operations of committing/aborting txn should 
success at the first time and fail at the second time.
+*       2. The internal API, registerSendOrAckOp and endSendOrAckOp
+*                      1. Register 4 operation but only end 3 operations, the 
transaction can not be committed or aborted.
+*                      2. Register 3 operation and end 3 operation the 
transaction can be committed and aborted.
+*                      3. Register an operation and end the operation with an 
error,
+*                      and then the state of the transaction should be 
replaced to errored.
+*              3. The internal API, registerAckTopic and registerProducerTopic
+*                      1. Register ack topic and send topic, and call http 
request to get the stats of the transaction
+*              to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+       tc, _ := createTcClient(t)
+       //1. Open a transaction and then commit it.
+       //The operations of committing txn1 should success at the first time 
and fail at the second time.
+       txn1 := createTxn(tc, t)
+       err := txn1.Commit(context.Background())
+       if err != nil {
+               t.Fatalf("Failed to commit the transaction %d:%d, %s\n", 
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+                       err.Error())
+       }
+       txn1.state = Open
+       txn1.opsFlow <- true
+       err = txn1.Commit(context.Background())
+       assert.Equal(t, err.(*Error).Result(), 
Result(pb.ServerError_TransactionNotFound))
+       assert.Equal(t, txn1.GetState(), Errored)
+       //2. Open a transaction and then abort it.
+       //The operations of aborting txn2 should success at the first time and 
fail at the second time.
+       id2, err := tc.newTransaction(time.Hour)
+       if err != nil {
+               t.Fatalf("Failed to new a transaction %s", err.Error())
+       }

Review Comment:
   ```suggestion
        assert.Nil(t, err, "Failed to new a transaction %s\n", err.Error())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to