This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 58b8e10b76b [improve][txn] Client add exception for transaction not 
fully init in broker (#18276)
58b8e10b76b is described below

commit 58b8e10b76b875fa48c12fa3164d593d4ddcb743
Author: congbo <[email protected]>
AuthorDate: Fri Nov 4 19:45:55 2022 +0800

    [improve][txn] Client add exception for transaction not fully init in 
broker (#18276)
    
    issues : https://github.com/apache/pulsar/issues/17876
    ### Motivation
    If broker not enable transaction or transaction not fully init in broker, 
get transaction assign topic will renturn the non-paritition assign topic, now 
transaction dont' support non-partition assgin topic init transaction 
coordinaotr. so add a exception for client users to know about the result not 
block
    
    ### Modifications
    
    add the exception for users
    
    ### Verifying this change
    
    delete partition transaction assign topic, client enable transaction will 
throw exception
    ### Does this pull request potentially affect one of the following parts:
    
    *If the box was checked, please highlight the changes*
    
    - [ ] Dependencies (add or upgrade a dependency)
    - [ ] The public API
    - [ ] The schema
    - [ ] The default values of configurations
    - [ ] The threading model
    - [ ] The binary protocol
    - [ ] The REST endpoints
    - [ ] The admin CLI options
    - [ ] Anything that affects deployment
    
    ### Documentation
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local 
preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR 
description, or else your PR might not get merged. -->
    - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
    - [x ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
    - [ ] `doc-complete` <!-- Docs have been already added -->
    
    ### Matching PR in forked repository
    
    PR in forked repository: <!-- ENTER URL HERE -->
    
    <!--
    After opening this PR, the build in apache/pulsar will fail and 
instructions will
    be provided for opening a PR in the PR author's forked repository.
    
    apache/pulsar pull requests should be first tested in your own fork since 
the
    apache/pulsar CI based on GitHub Actions has constrained resources and 
quota.
    GitHub Actions provides separate quota for pull requests that are executed 
in
    a forked repository.
    
    The tests will be run in the forked repository until all PR review comments 
have
    been handled, the tests pass and the PR is approved by a reviewer.
    -->
    
    PR in forked repository:
    
    - https://github.com/congbobo184/pulsar/pull/6
---
 .../impl/AutoCloseUselessClientConTXTest.java      | 51 +++++++++-------------
 .../client/impl/TransactionClientConnectTest.java  | 12 +++++
 .../TransactionCoordinatorClientImpl.java          | 11 ++---
 3 files changed, 36 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTXTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTXTest.java
index 69221b49754..5f1c9037944 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTXTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTXTest.java
@@ -46,8 +46,8 @@ import org.testng.annotations.Test;
 @Test(groups = "broker-impl")
 public class AutoCloseUselessClientConTXTest extends 
AutoCloseUselessClientConSupports {
 
-    private static String topicName = 
UUID.randomUUID().toString().replaceAll("-","");
-    private static String topicFullName = "persistent://public/default/" + 
topicName;
+    private static final String topicName = 
UUID.randomUUID().toString().replaceAll("-","");
+    private static final String topicFullName = "persistent://public/default/" 
+ topicName;
 
     @BeforeMethod
     public void before() throws PulsarAdminException, MetadataStoreException {
@@ -59,38 +59,18 @@ public class AutoCloseUselessClientConTXTest extends 
AutoCloseUselessClientConSu
                 && !topicList_defaultNamespace.contains(topicFullName)){
             pulsarAdmin_0.topics().createNonPartitionedTopic(topicFullName);
         }
-        List<String> topicList_systemNamespace = 
pulsarAdmin_0.topics().getList("pulsar/system");
-
-        if (!pulsar.getPulsarResources()
-                .getNamespaceResources()
-                
.getPartitionedTopicResources().partitionedTopicExists(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)){
-            pulsar.getPulsarResources()
-                    .getNamespaceResources()
-                    .getPartitionedTopicResources()
-                    
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
-                            new PartitionedTopicMetadata(2));
-        }
-        if (!pulsar.getPulsarResources()
-                .getNamespaceResources()
-                
.getPartitionedTopicResources().partitionedTopicExists(SystemTopicNames.TRANSACTION_COORDINATOR_LOG)){
-            pulsar.getPulsarResources()
-                    .getNamespaceResources()
-                    .getPartitionedTopicResources()
-                    
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_LOG,
-                            new PartitionedTopicMetadata(2));
-        }
     }
 
     @Override
     protected void doInitConf() throws Exception {
         super.doInitConf();
-        updateConfig(conf, "BROKER-INIT");
+        updateConfig(conf);
     }
 
     @Override
     protected ServiceConfiguration createConfForAdditionalBroker(int 
additionalBrokerIndex) {
         ServiceConfiguration conf = 
super.createConfForAdditionalBroker(additionalBrokerIndex);
-        updateConfig(conf, "BROKER-" + additionalBrokerIndex);
+        updateConfig(conf);
         return conf;
     }
 
@@ -110,7 +90,20 @@ public class AutoCloseUselessClientConTXTest extends 
AutoCloseUselessClientConSu
             if 
(!admin.namespaces().getNamespaces("pulsar").contains("pulsar/system")) {
                 admin.namespaces().createNamespace("pulsar/system");
             }
-        }catch (Exception e){
+
+            if (conf.isTransactionCoordinatorEnabled()) {
+                if (!pulsar.getPulsarResources()
+                        .getNamespaceResources()
+                        .getPartitionedTopicResources()
+                        
.partitionedTopicExists(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)){
+                    pulsar.getPulsarResources()
+                            .getNamespaceResources()
+                            .getPartitionedTopicResources()
+                            
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                                    new PartitionedTopicMetadata(2));
+                }
+            }
+        } catch (Exception e){
             log.warn("create namespace failure", e);
         }
         return clientBuilder.enableTransaction(true).build();
@@ -119,7 +112,7 @@ public class AutoCloseUselessClientConTXTest extends 
AutoCloseUselessClientConSu
     /**
      * Override for make broker enable transaction.
      */
-    private void updateConfig(ServiceConfiguration conf, String 
advertisedAddress) {
+    private void updateConfig(ServiceConfiguration conf) {
         this.conf.setTransactionCoordinatorEnabled(true);
         this.conf.setSystemTopicEnabled(true);
         this.conf.setTopicLevelPoliciesEnabled(true);
@@ -132,11 +125,11 @@ public class AutoCloseUselessClientConTXTest extends 
AutoCloseUselessClientConSu
     public void testConnectionAutoReleaseUnPartitionedTopicWithTransaction() 
throws Exception {
         PulsarClientImpl pulsarClient = (PulsarClientImpl) 
super.getAllClients().get(0);
         // Init clients
-        Consumer consumer = pulsarClient.newConsumer()
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionName("my-subscription-x")
                 .subscribe();
-        Producer producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer()
                 .sendTimeout(0, TimeUnit.SECONDS)
                 .topic(topicName)
                 .create();
@@ -158,6 +151,4 @@ public class AutoCloseUselessClientConTXTest extends 
AutoCloseUselessClientConSu
         consumer.close();
         producer.close();
     }
-
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
index 5c19eb64a68..34e2362431d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
@@ -35,9 +35,12 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -173,6 +176,15 @@ public class TransactionClientConnectTest extends 
TransactionTestBase {
         Assert.assertTrue(transactionMetaStoreHandler.changeToReadyState());
     }
 
+
+    @Test(expectedExceptions = PulsarClientException.class)
+    public void testNotEnableTransactionInBroker() throws Exception {
+        
getPulsarServiceList().get(0).getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                
.deletePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
+        PulsarClient.builder().enableTransaction(true)
+                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
+    }
+
     public void start() throws Exception {
         // wait transaction coordinator init success
         pulsarClient.newTransaction()
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index e366afb3c2e..71629d8cbbf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -97,14 +97,9 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
                             handler.start();
                         }
                     } else {
-                        handlers = new TransactionMetaStoreHandler[1];
-                        CompletableFuture<Void> connectFuture = new 
CompletableFuture<>();
-                        connectFutureList.add(connectFuture);
-                        TransactionMetaStoreHandler handler = new 
TransactionMetaStoreHandler(0, pulsarClient,
-                                getTCAssignTopicName(-1), connectFuture);
-                        handlers[0] = handler;
-                        handlerMap.put(0, handler);
-                        handler.start();
+                        return FutureUtil.failedFuture(new 
TransactionCoordinatorClientException(
+                                "The broker doesn't enable the transaction 
coordinator, "
+                                        + "or the transaction coordinator has 
not initialized"));
                     }
 
                     STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, 
State.READY);

Reply via email to