BewareMyPower commented on code in PR #984:
URL: https://github.com/apache/pulsar-client-go/pull/984#discussion_r1146129062
##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
defer client.Close()
}
+/**
+* Test points:
+* 1. Abort and commit txn.
+* 1. Do nothing, just open a transaction and then commit
it or abort it.
+* The operations of committing/aborting txn should
success at the first time and fail at the second time.
+* 2. The internal API, registerSendOrAckOp and endSendOrAckOp
+* 1. Register 4 operation but only end 3 operations, the
transaction can not be committed or aborted.
+* 2. Register 3 operation and end 3 operation the
transaction can be committed and aborted.
+* 3. Register an operation and end the operation with an
error,
+* and then the state of the transaction should be
replaced to errored.
+* 3. The internal API, registerAckTopic and registerProducerTopic
+* 1. Register ack topic and send topic, and call http
request to get the stats of the transaction
+* to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Open a transaction and then commit it.
+ //The operations of committing txn1 should success at the first time
and fail at the second time.
+ txn1 := createTxn(tc, t)
+ err := txn1.Commit(context.Background())
+ if err != nil {
+ t.Fatalf("Failed to commit the transaction %d:%d, %s\n",
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+ err.Error())
+ }
+ txn1.state = Open
+ txn1.opsFlow <- true
+ err = txn1.Commit(context.Background())
+ assert.Equal(t, err.(*Error).Result(),
Result(pb.ServerError_TransactionNotFound))
+ assert.Equal(t, txn1.GetState(), Errored)
+ //2. Open a transaction and then abort it.
+ //The operations of aborting txn2 should success at the first time and
fail at the second time.
+ id2, err := tc.newTransaction(time.Hour)
+ if err != nil {
+ t.Fatalf("Failed to new a transaction %s", err.Error())
+ }
+ txn2 := newTransaction(*id2, tc, time.Hour)
+ err = txn2.Abort(context.Background())
+ if err != nil {
+ t.Fatalf("Failed to abort the transaction %d:%d, %s\n",
id2.mostSigBits, id2.leastSigBits, err.Error())
+ }
+ txn2.state = Open
+ txn2.opsFlow <- true
+ err = txn2.Abort(context.Background())
+ assert.Equal(t, err.(*Error).Result(),
Result(pb.ServerError_TransactionNotFound))
+ assert.Equal(t, txn1.GetState(), Errored)
+ err = txn2.registerSendOrAckOp()
+ assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+ err = txn1.registerSendOrAckOp()
+ assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+}
+
+/** TestRegisterOpAndEndOp Test the internal API including the
registerSendOrAckOp and endSendOrAckOp. */
+func TestRegisterOpAndEndOp(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Register 4 operation but only end 3 operations, the transaction
can not be committed or aborted.
+ res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
+ assert.Equal(t, res.(*Error).Result(), TimeoutError)
+ res = registerOpAndEndOp(t, tc, 4, 3, nil, false)
+ assert.Equal(t, res.(*Error).Result(), TimeoutError)
+
+ //2. Register 3 operation and end 3 operation the transaction can be
committed and aborted.
+ res = registerOpAndEndOp(t, tc, 3, 3, nil, true)
+ assert.Nil(t, res)
+ res = registerOpAndEndOp(t, tc, 3, 3, nil, false)
+ assert.Nil(t, res)
+ //3. Register an operation and end the operation with an error,
+ // and then the state of the transaction should be replaced to errored.
+ res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), true)
+ assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+ res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), false)
+ assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+}
+
+/** TestRegisterTopic Test the internal API, registerAckTopic and
registerProducerTopic */
+func TestRegisterTopic(t *testing.T) {
+ //1. Prepare: create PulsarClient and init transaction coordinator
client.
+ topic := newTopicName()
+ sub := "my-sub"
+ tc, client := createTcClient(t)
+ //2. Prepare: create Topic and Subscription.
+ _, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ })
+ assert.NoError(t, err)
+ txn := createTxn(tc, t)
+ //3. Create a topic and register topic and subscription.
+ err = txn.registerAckTopic(topic, sub)
+ if err != nil {
+ t.Fatalf("Failed to register ack topic %s", err.Error())
+ }
+ err = txn.registerProducerTopic(topic)
+ if err != nil {
+ t.Fatalf("Failed to register ack topic %s", err.Error())
+ }
+ //4. Call http request to get the stats of the transaction to do
verification.
+ stats2, err := transactionStats(&txn.txnID)
+ assert.NoError(t, err)
+ topics := stats2["producedPartitions"].(map[string]interface{})
+ subTopics := stats2["ackedPartitions"].(map[string]interface{})
+ assert.NotNil(t, topics[topic])
+ assert.NotNil(t, subTopics[topic])
+ subs := subTopics[topic].(map[string]interface{})
+ assert.NotNil(t, subs[sub])
+}
+
+func registerOpAndEndOp(t *testing.T, tc *transactionCoordinatorClient, rp
int, ep int, err error, commit bool) error {
+ txn := createTxn(tc, t)
+ for i := 0; i < rp; i++ {
+ err := txn.registerSendOrAckOp()
+ assert.Nil(t, err)
+ }
+ for i := 0; i < ep; i++ {
+ txn.endSendOrAckOp(err)
+ }
+ if commit {
+ err = txn.Commit(context.Background())
+ } else {
+ err = txn.Abort(context.Background())
+ }
+ return err
+}
+
+func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
+ id3, err := tc.newTransaction(time.Hour)
Review Comment:
Please don't use a number suffix, I think we can just use `id` here.
##########
pulsar/transaction.go:
##########
@@ -17,7 +17,53 @@
package pulsar
+import (
+ "context"
+)
+
+// TxnState The state of the transaction. Check the state of the transaction
before executing some operation
+// with the transaction is necessary.
+type TxnState int32
+
+const (
+ _ TxnState = iota
+ // Open The transaction in Open state can be used to send/ack messages.
+ Open
+ // Committing The state of the transaction will be Committing after the
commit method is called.
+ // The transaction in Committing state can be committed again.
+ Committing
+ // Aborting The state of the transaction will be Aborting after the
abort method is called.
+ // The transaction in Aborting state can be aborted again.
+ Aborting
+ // Committed The state of the transaction will be Committed after the
commit method is executed success.
+ // This means that all the operations with the transaction are success.
+ Committed
+ // Aborted The state of the transaction will be Aborted after the abort
method is executed success.
+ // This means that all the operations with the transaction are aborted.
+ Aborted
+ // Errored The state of the transaction will be Errored after the
operation of transaction get a non-retryable error.
+ Errored
Review Comment:
I see the error code is `ERROR` in the Java client. Should we name it with
`Error` here? `Errored` looks confusing. We use the `ed` and `ing` suffixes to
differ whether a step is complete or ongoing.
##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
defer client.Close()
}
+/**
+* Test points:
+* 1. Abort and commit txn.
+* 1. Do nothing, just open a transaction and then commit
it or abort it.
+* The operations of committing/aborting txn should
success at the first time and fail at the second time.
+* 2. The internal API, registerSendOrAckOp and endSendOrAckOp
+* 1. Register 4 operation but only end 3 operations, the
transaction can not be committed or aborted.
+* 2. Register 3 operation and end 3 operation the
transaction can be committed and aborted.
+* 3. Register an operation and end the operation with an
error,
+* and then the state of the transaction should be
replaced to errored.
+* 3. The internal API, registerAckTopic and registerProducerTopic
+* 1. Register ack topic and send topic, and call http
request to get the stats of the transaction
+* to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Open a transaction and then commit it.
+ //The operations of committing txn1 should success at the first time
and fail at the second time.
+ txn1 := createTxn(tc, t)
+ err := txn1.Commit(context.Background())
+ if err != nil {
+ t.Fatalf("Failed to commit the transaction %d:%d, %s\n",
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+ err.Error())
+ }
Review Comment:
```suggestion
assert.Nil(t, err, "Failed to commit the transaction %d:%d, %s\n",
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits, err.Error())
```
##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
defer client.Close()
}
+/**
+* Test points:
+* 1. Abort and commit txn.
+* 1. Do nothing, just open a transaction and then commit
it or abort it.
+* The operations of committing/aborting txn should
success at the first time and fail at the second time.
+* 2. The internal API, registerSendOrAckOp and endSendOrAckOp
+* 1. Register 4 operation but only end 3 operations, the
transaction can not be committed or aborted.
+* 2. Register 3 operation and end 3 operation the
transaction can be committed and aborted.
+* 3. Register an operation and end the operation with an
error,
+* and then the state of the transaction should be
replaced to errored.
+* 3. The internal API, registerAckTopic and registerProducerTopic
+* 1. Register ack topic and send topic, and call http
request to get the stats of the transaction
+* to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Open a transaction and then commit it.
+ //The operations of committing txn1 should success at the first time
and fail at the second time.
+ txn1 := createTxn(tc, t)
+ err := txn1.Commit(context.Background())
+ if err != nil {
+ t.Fatalf("Failed to commit the transaction %d:%d, %s\n",
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+ err.Error())
+ }
+ txn1.state = Open
+ txn1.opsFlow <- true
+ err = txn1.Commit(context.Background())
+ assert.Equal(t, err.(*Error).Result(),
Result(pb.ServerError_TransactionNotFound))
+ assert.Equal(t, txn1.GetState(), Errored)
+ //2. Open a transaction and then abort it.
+ //The operations of aborting txn2 should success at the first time and
fail at the second time.
+ id2, err := tc.newTransaction(time.Hour)
+ if err != nil {
+ t.Fatalf("Failed to new a transaction %s", err.Error())
+ }
+ txn2 := newTransaction(*id2, tc, time.Hour)
+ err = txn2.Abort(context.Background())
+ if err != nil {
+ t.Fatalf("Failed to abort the transaction %d:%d, %s\n",
id2.mostSigBits, id2.leastSigBits, err.Error())
+ }
Review Comment:
```suggestion
assert.Nil(t, err, "Failed to abort the transaction %d:%d, %s\n",
id2.mostSigBits, id2.leastSigBits, err.Error())
```
##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
defer client.Close()
}
+/**
+* Test points:
+* 1. Abort and commit txn.
+* 1. Do nothing, just open a transaction and then commit
it or abort it.
+* The operations of committing/aborting txn should
success at the first time and fail at the second time.
+* 2. The internal API, registerSendOrAckOp and endSendOrAckOp
+* 1. Register 4 operation but only end 3 operations, the
transaction can not be committed or aborted.
+* 2. Register 3 operation and end 3 operation the
transaction can be committed and aborted.
+* 3. Register an operation and end the operation with an
error,
+* and then the state of the transaction should be
replaced to errored.
+* 3. The internal API, registerAckTopic and registerProducerTopic
+* 1. Register ack topic and send topic, and call http
request to get the stats of the transaction
+* to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Open a transaction and then commit it.
+ //The operations of committing txn1 should success at the first time
and fail at the second time.
+ txn1 := createTxn(tc, t)
+ err := txn1.Commit(context.Background())
+ if err != nil {
+ t.Fatalf("Failed to commit the transaction %d:%d, %s\n",
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+ err.Error())
+ }
+ txn1.state = Open
+ txn1.opsFlow <- true
+ err = txn1.Commit(context.Background())
+ assert.Equal(t, err.(*Error).Result(),
Result(pb.ServerError_TransactionNotFound))
+ assert.Equal(t, txn1.GetState(), Errored)
+ //2. Open a transaction and then abort it.
+ //The operations of aborting txn2 should success at the first time and
fail at the second time.
+ id2, err := tc.newTransaction(time.Hour)
+ if err != nil {
+ t.Fatalf("Failed to new a transaction %s", err.Error())
+ }
+ txn2 := newTransaction(*id2, tc, time.Hour)
+ err = txn2.Abort(context.Background())
+ if err != nil {
+ t.Fatalf("Failed to abort the transaction %d:%d, %s\n",
id2.mostSigBits, id2.leastSigBits, err.Error())
+ }
+ txn2.state = Open
+ txn2.opsFlow <- true
+ err = txn2.Abort(context.Background())
+ assert.Equal(t, err.(*Error).Result(),
Result(pb.ServerError_TransactionNotFound))
+ assert.Equal(t, txn1.GetState(), Errored)
+ err = txn2.registerSendOrAckOp()
+ assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+ err = txn1.registerSendOrAckOp()
+ assert.Equal(t, err.(*Error).Result(), InvalidStatus)
+}
+
+/** TestRegisterOpAndEndOp Test the internal API including the
registerSendOrAckOp and endSendOrAckOp. */
+func TestRegisterOpAndEndOp(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Register 4 operation but only end 3 operations, the transaction
can not be committed or aborted.
+ res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
+ assert.Equal(t, res.(*Error).Result(), TimeoutError)
+ res = registerOpAndEndOp(t, tc, 4, 3, nil, false)
+ assert.Equal(t, res.(*Error).Result(), TimeoutError)
+
+ //2. Register 3 operation and end 3 operation the transaction can be
committed and aborted.
+ res = registerOpAndEndOp(t, tc, 3, 3, nil, true)
+ assert.Nil(t, res)
+ res = registerOpAndEndOp(t, tc, 3, 3, nil, false)
+ assert.Nil(t, res)
+ //3. Register an operation and end the operation with an error,
+ // and then the state of the transaction should be replaced to errored.
+ res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), true)
+ assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+ res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), false)
+ assert.Equal(t, res.(*Error).Result(), InvalidStatus)
+}
+
+/** TestRegisterTopic Test the internal API, registerAckTopic and
registerProducerTopic */
+func TestRegisterTopic(t *testing.T) {
+ //1. Prepare: create PulsarClient and init transaction coordinator
client.
+ topic := newTopicName()
+ sub := "my-sub"
+ tc, client := createTcClient(t)
+ //2. Prepare: create Topic and Subscription.
+ _, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ })
+ assert.NoError(t, err)
+ txn := createTxn(tc, t)
+ //3. Create a topic and register topic and subscription.
+ err = txn.registerAckTopic(topic, sub)
+ if err != nil {
+ t.Fatalf("Failed to register ack topic %s", err.Error())
+ }
+ err = txn.registerProducerTopic(topic)
+ if err != nil {
+ t.Fatalf("Failed to register ack topic %s", err.Error())
+ }
Review Comment:
Please also replace the check with the `assert.Nil` call.
##########
pulsar/transaction_test.go:
##########
@@ -79,6 +81,138 @@ func TestTCClient(t *testing.T) {
defer client.Close()
}
+/**
+* Test points:
+* 1. Abort and commit txn.
+* 1. Do nothing, just open a transaction and then commit
it or abort it.
+* The operations of committing/aborting txn should
success at the first time and fail at the second time.
+* 2. The internal API, registerSendOrAckOp and endSendOrAckOp
+* 1. Register 4 operation but only end 3 operations, the
transaction can not be committed or aborted.
+* 2. Register 3 operation and end 3 operation the
transaction can be committed and aborted.
+* 3. Register an operation and end the operation with an
error,
+* and then the state of the transaction should be
replaced to errored.
+* 3. The internal API, registerAckTopic and registerProducerTopic
+* 1. Register ack topic and send topic, and call http
request to get the stats of the transaction
+* to do verification.
+ */
+/** TestTxnImplCommitOrAbort Test abort and commit txn */
+func TestTxnImplCommitOrAbort(t *testing.T) {
+ tc, _ := createTcClient(t)
+ //1. Open a transaction and then commit it.
+ //The operations of committing txn1 should success at the first time
and fail at the second time.
+ txn1 := createTxn(tc, t)
+ err := txn1.Commit(context.Background())
+ if err != nil {
+ t.Fatalf("Failed to commit the transaction %d:%d, %s\n",
txn1.txnID.mostSigBits, txn1.txnID.leastSigBits,
+ err.Error())
+ }
+ txn1.state = Open
+ txn1.opsFlow <- true
+ err = txn1.Commit(context.Background())
+ assert.Equal(t, err.(*Error).Result(),
Result(pb.ServerError_TransactionNotFound))
+ assert.Equal(t, txn1.GetState(), Errored)
+ //2. Open a transaction and then abort it.
+ //The operations of aborting txn2 should success at the first time and
fail at the second time.
+ id2, err := tc.newTransaction(time.Hour)
+ if err != nil {
+ t.Fatalf("Failed to new a transaction %s", err.Error())
+ }
Review Comment:
```suggestion
assert.Nil(t, err, "Failed to new a transaction %s\n", err.Error())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]