BewareMyPower commented on code in PR #1237:
URL: https://github.com/apache/pulsar-client-go/pull/1237#discussion_r1671954233
##########
pulsar/transaction_test.go:
##########
@@ -539,3 +540,56 @@ func TestAckChunkMessage(t *testing.T) {
require.Nil(t, err)
consumerShouldNotReceiveMessage(t, consumer)
}
+
+func TestTxnConnReconnect(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ topic := newTopicName()
+ _, cli := createTcClient(t)
+
+ txn, err := cli.NewTransaction(5 * time.Minute)
+ assert.NoError(t, err)
+
+ connections := cli.cnxPool.GetConnections()
+ for _, conn := range connections {
+ conn.Close()
+ }
+
+ err = txn.Commit(ctx)
+ assert.NoError(t, err)
+
+ txn, err = cli.NewTransaction(5 * time.Minute)
+ assert.NoError(t, err) // Assert that the transaction can be opened
after the connections are reconnected
+
+ // Start a goroutine to periodically close connections
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(1 * time.Second):
+ connections := cli.cnxPool.GetConnections()
+ for _, conn := range connections {
+ conn.Close()
+ }
Review Comment:
The only difference with `ConnectionPool.Close` is it does not remove the
connection from the pool. Could you call `cli.cnxPool.Close()` here instead to
avoid adding a `GetConnections()` method just for tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]