This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 58b8e10b76b [improve][txn] Client add exception for transaction not
fully init in broker (#18276)
58b8e10b76b is described below
commit 58b8e10b76b875fa48c12fa3164d593d4ddcb743
Author: congbo <[email protected]>
AuthorDate: Fri Nov 4 19:45:55 2022 +0800
[improve][txn] Client add exception for transaction not fully init in
broker (#18276)
issues : https://github.com/apache/pulsar/issues/17876
### Motivation
If broker not enable transaction or transaction not fully init in broker,
get transaction assign topic will renturn the non-paritition assign topic, now
transaction dont' support non-partition assgin topic init transaction
coordinaotr. so add a exception for client users to know about the result not
block
### Modifications
add the exception for users
### Verifying this change
delete partition transaction assign topic, client enable transaction will
throw exception
### Does this pull request potentially affect one of the following parts:
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] Anything that affects deployment
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local
preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR
description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: <!-- ENTER URL HERE -->
<!--
After opening this PR, the build in apache/pulsar will fail and
instructions will
be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since
the
apache/pulsar CI based on GitHub Actions has constrained resources and
quota.
GitHub Actions provides separate quota for pull requests that are executed
in
a forked repository.
The tests will be run in the forked repository until all PR review comments
have
been handled, the tests pass and the PR is approved by a reviewer.
-->
PR in forked repository:
- https://github.com/congbobo184/pulsar/pull/6
---
.../impl/AutoCloseUselessClientConTXTest.java | 51 +++++++++-------------
.../client/impl/TransactionClientConnectTest.java | 12 +++++
.../TransactionCoordinatorClientImpl.java | 11 ++---
3 files changed, 36 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTXTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTXTest.java
index 69221b49754..5f1c9037944 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTXTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTXTest.java
@@ -46,8 +46,8 @@ import org.testng.annotations.Test;
@Test(groups = "broker-impl")
public class AutoCloseUselessClientConTXTest extends
AutoCloseUselessClientConSupports {
- private static String topicName =
UUID.randomUUID().toString().replaceAll("-","");
- private static String topicFullName = "persistent://public/default/" +
topicName;
+ private static final String topicName =
UUID.randomUUID().toString().replaceAll("-","");
+ private static final String topicFullName = "persistent://public/default/"
+ topicName;
@BeforeMethod
public void before() throws PulsarAdminException, MetadataStoreException {
@@ -59,38 +59,18 @@ public class AutoCloseUselessClientConTXTest extends
AutoCloseUselessClientConSu
&& !topicList_defaultNamespace.contains(topicFullName)){
pulsarAdmin_0.topics().createNonPartitionedTopic(topicFullName);
}
- List<String> topicList_systemNamespace =
pulsarAdmin_0.topics().getList("pulsar/system");
-
- if (!pulsar.getPulsarResources()
- .getNamespaceResources()
-
.getPartitionedTopicResources().partitionedTopicExists(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)){
- pulsar.getPulsarResources()
- .getNamespaceResources()
- .getPartitionedTopicResources()
-
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
- new PartitionedTopicMetadata(2));
- }
- if (!pulsar.getPulsarResources()
- .getNamespaceResources()
-
.getPartitionedTopicResources().partitionedTopicExists(SystemTopicNames.TRANSACTION_COORDINATOR_LOG)){
- pulsar.getPulsarResources()
- .getNamespaceResources()
- .getPartitionedTopicResources()
-
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_LOG,
- new PartitionedTopicMetadata(2));
- }
}
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
- updateConfig(conf, "BROKER-INIT");
+ updateConfig(conf);
}
@Override
protected ServiceConfiguration createConfForAdditionalBroker(int
additionalBrokerIndex) {
ServiceConfiguration conf =
super.createConfForAdditionalBroker(additionalBrokerIndex);
- updateConfig(conf, "BROKER-" + additionalBrokerIndex);
+ updateConfig(conf);
return conf;
}
@@ -110,7 +90,20 @@ public class AutoCloseUselessClientConTXTest extends
AutoCloseUselessClientConSu
if
(!admin.namespaces().getNamespaces("pulsar").contains("pulsar/system")) {
admin.namespaces().createNamespace("pulsar/system");
}
- }catch (Exception e){
+
+ if (conf.isTransactionCoordinatorEnabled()) {
+ if (!pulsar.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+
.partitionedTopicExists(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)){
+ pulsar.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(2));
+ }
+ }
+ } catch (Exception e){
log.warn("create namespace failure", e);
}
return clientBuilder.enableTransaction(true).build();
@@ -119,7 +112,7 @@ public class AutoCloseUselessClientConTXTest extends
AutoCloseUselessClientConSu
/**
* Override for make broker enable transaction.
*/
- private void updateConfig(ServiceConfiguration conf, String
advertisedAddress) {
+ private void updateConfig(ServiceConfiguration conf) {
this.conf.setTransactionCoordinatorEnabled(true);
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
@@ -132,11 +125,11 @@ public class AutoCloseUselessClientConTXTest extends
AutoCloseUselessClientConSu
public void testConnectionAutoReleaseUnPartitionedTopicWithTransaction()
throws Exception {
PulsarClientImpl pulsarClient = (PulsarClientImpl)
super.getAllClients().get(0);
// Init clients
- Consumer consumer = pulsarClient.newConsumer()
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("my-subscription-x")
.subscribe();
- Producer producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer()
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topicName)
.create();
@@ -158,6 +151,4 @@ public class AutoCloseUselessClientConTXTest extends
AutoCloseUselessClientConSu
consumer.close();
producer.close();
}
-
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
index 5c19eb64a68..34e2362431d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
@@ -35,9 +35,12 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -173,6 +176,15 @@ public class TransactionClientConnectTest extends
TransactionTestBase {
Assert.assertTrue(transactionMetaStoreHandler.changeToReadyState());
}
+
+ @Test(expectedExceptions = PulsarClientException.class)
+ public void testNotEnableTransactionInBroker() throws Exception {
+
getPulsarServiceList().get(0).getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+
.deletePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
+ PulsarClient.builder().enableTransaction(true)
+
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
+ }
+
public void start() throws Exception {
// wait transaction coordinator init success
pulsarClient.newTransaction()
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index e366afb3c2e..71629d8cbbf 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -97,14 +97,9 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
handler.start();
}
} else {
- handlers = new TransactionMetaStoreHandler[1];
- CompletableFuture<Void> connectFuture = new
CompletableFuture<>();
- connectFutureList.add(connectFuture);
- TransactionMetaStoreHandler handler = new
TransactionMetaStoreHandler(0, pulsarClient,
- getTCAssignTopicName(-1), connectFuture);
- handlers[0] = handler;
- handlerMap.put(0, handler);
- handler.start();
+ return FutureUtil.failedFuture(new
TransactionCoordinatorClientException(
+ "The broker doesn't enable the transaction
coordinator, "
+ + "or the transaction coordinator has
not initialized"));
}
STATE_UPDATER.set(TransactionCoordinatorClientImpl.this,
State.READY);