This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ab93d5a fix: return error when the client transaction coordinator is 
nil to p… (#1444)
6ab93d5a is described below

commit 6ab93d5a1d8104b92a419f0abb14a3c0a9f684c4
Author: Thomas Bousquet <[email protected]>
AuthorDate: Tue Dec 2 19:49:45 2025 -0800

    fix: return error when the client transaction coordinator is nil to p… 
(#1444)
    
    * fix: return error when the client transaction coordinator is nil to 
prevent panic
    
    * test: add testcase to ensure error is actually returned
---
 pulsar/client_impl.go      |  8 ++++++++
 pulsar/transaction_test.go | 36 +++++++++++++++++++++++++-----------
 2 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 809e4d72..d940367f 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -23,6 +23,8 @@ import (
        "sync"
        "time"
 
+       "errors"
+
        "github.com/apache/pulsar-client-go/pulsar/auth"
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/log"
@@ -39,6 +41,8 @@ const (
        minConnMaxIdleTime                 = 60 * time.Second
 )
 
+var ErrClientTransactionsNotEnabled = errors.New("transactions are not enabled 
with the client")
+
 type client struct {
        cnxPool          internal.ConnectionPool
        rpcClient        internal.RPCClient
@@ -196,6 +200,10 @@ func newClient(options ClientOptions) (Client, error) {
 }
 
 func (c *client) NewTransaction(timeout time.Duration) (Transaction, error) {
+       if c.tcClient == nil {
+               return nil, ErrClientTransactionsNotEnabled
+       }
+
        id, err := c.tcClient.newTransaction(timeout)
        if err != nil {
                return nil, err
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 75b36ea8..9921ea03 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -35,7 +35,7 @@ func TestTxn_TCClient(t *testing.T) {
        //1. Prepare: create PulsarClient and init transaction coordinator 
client.
        topic := newTopicName()
        sub := "my-sub"
-       tc, client := createTcClient(t)
+       tc, client := createClientWithTC(t)
        //2. Prepare: create Topic and Subscription.
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:            topic,
@@ -100,7 +100,7 @@ func TestTxn_TCClient(t *testing.T) {
 
 // Test abort and commit txn
 func TestTxn_ImplCommitOrAbort(t *testing.T) {
-       tc, _ := createTcClient(t)
+       tc, _ := createClientWithTC(t)
        //1. Open a transaction and then commit it.
        //The operations of committing txn1 should success at the first time 
and fail at the second time.
        txn1 := createTxn(tc, t)
@@ -133,7 +133,7 @@ func TestTxn_ImplCommitOrAbort(t *testing.T) {
 
 // Test the internal API including the registerSendOrAckOp and endSendOrAckOp.
 func TestTxn_RegisterOpAndEndOp(t *testing.T) {
-       tc, _ := createTcClient(t)
+       tc, _ := createClientWithTC(t)
        //1. Register 4 operation but only end 3 operations, the transaction 
can not be committed or aborted.
        res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
        assert.Equal(t, res.(*Error).Result(), TimeoutError)
@@ -158,7 +158,7 @@ func TestTxn_RegisterTopic(t *testing.T) {
        //1. Prepare: create PulsarClient and init transaction coordinator 
client.
        topic := newTopicName()
        sub := "my-sub"
-       tc, client := createTcClient(t)
+       tc, client := createClientWithTC(t)
        //2. Prepare: create Topic and Subscription.
        _, err := client.Subscribe(ConsumerOptions{
                Topic:            topic,
@@ -205,8 +205,22 @@ func createTxn(tc *transactionCoordinatorClient, t 
*testing.T) *transaction {
        return newTransaction(*id, tc, txnTimeout)
 }
 
-// createTcClient Create a transaction coordinator client to send request
-func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) {
+func TestTxn_NoTransactionCoordinator(t *testing.T) {
+       clientWithNoTC, err := NewClient(ClientOptions{
+               URL:                   webServiceURLTLS,
+               TLSTrustCertsFilePath: caCertsPath,
+               Authentication:        NewAuthenticationTLS(tlsClientCertPath, 
tlsClientKeyPath),
+               EnableTransaction:     false,
+       })
+       require.Nil(t, err, "Failed to create client.")
+
+       tx, err := clientWithNoTC.NewTransaction(txnTimeout)
+       require.Nil(t, tx, "Did not fail creating a new transaction, 
transaction should be nil")
+       require.ErrorIs(t, err, ErrClientTransactionsNotEnabled)
+}
+
+// createClientWithTC creates a new client with a transaction coordinator 
client to send request
+func createClientWithTC(t *testing.T) (*transactionCoordinatorClient, *client) 
{
        c, err := NewClient(ClientOptions{
                URL:                   webServiceURLTLS,
                TLSTrustCertsFilePath: caCertsPath,
@@ -236,7 +250,7 @@ func TestTxn_ConsumeAndProduce(t *testing.T) {
        // Step 1: Prepare - Create PulsarClient and initialize the transaction 
coordinator client.
        topic := newTopicName()
        sub := "my-sub"
-       _, client := createTcClient(t)
+       _, client := createClientWithTC(t)
        // Step 2: Prepare - Create Topic and Subscription.
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:            topic,
@@ -330,7 +344,7 @@ func TestTxn_AckAndSend(t *testing.T) {
        sourceTopic := newTopicName()
        sinkTopic := newTopicName()
        sub := "my-sub"
-       _, client := createTcClient(t)
+       _, client := createClientWithTC(t)
 
        // Prepare: Create source and sink topics and subscriptions.
        sourceConsumer, _ := client.Subscribe(ConsumerOptions{
@@ -399,7 +413,7 @@ func TestTxn_TransactionAbort(t *testing.T) {
        // Prepare: Create PulsarClient and initialize the transaction 
coordinator client.
        topic := newTopicName()
        sub := "my-sub"
-       _, client := createTcClient(t)
+       _, client := createClientWithTC(t)
 
        // Prepare: Create Topic and Subscription.
        consumer, _ := client.Subscribe(ConsumerOptions{
@@ -456,7 +470,7 @@ func TestTxn_AckChunkMessage(t *testing.T) {
        sub := "my-sub"
 
        // Prepare: Create PulsarClient and initialize the transaction 
coordinator client.
-       _, client := createTcClient(t)
+       _, client := createClientWithTC(t)
 
        // Create transaction and register the send operation.
        txn, err := client.NewTransaction(txnTimeout)
@@ -548,7 +562,7 @@ func TestTxn_ConnReconnect(t *testing.T) {
        defer cancel()
 
        topic := newTopicName()
-       _, cli := createTcClient(t)
+       _, cli := createClientWithTC(t)
 
        txn, err := cli.NewTransaction(5 * time.Minute)
        assert.NoError(t, err)

Reply via email to