eolivelli commented on a change in pull request #11357:
URL: https://github.com/apache/pulsar/pull/11357#discussion_r708173091



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -691,7 +692,11 @@ protected void internalUnloadTopic(AsyncResponse 
asyncResponse, boolean authorit
         }
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
-            internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
+            if 
(topicName.toString().startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()))
 {

Review comment:
       we should have a better way to detect a system topic, IIRC we added 
something about this

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -66,11 +75,17 @@ protected void cleanup() throws Exception {
     }
 
     @Test
-    public void testAddAndRemoveTransactionMetadataStore() {
+    public void testCloseLock() {

Review comment:
       remove empty test case ?

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
##########
@@ -396,6 +396,7 @@ private static void verifyCoordinatorStats(String state,
     private void initTransaction(int coordinatorSize) throws Exception {
         
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 coordinatorSize);
         
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();

Review comment:
       please close this client, otherwise the new variable below will override 
the reference and we will leak a PulsarClient

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -400,13 +432,14 @@ public void endTransactionForTimeout(TxnID txnID) {
     }
 
     private static boolean isRetryableException(Throwable e) {
-        return e instanceof TransactionMetadataStoreStateException
+        return (e instanceof TransactionMetadataStoreStateException
                 || e instanceof RequestTimeoutException
                 || e instanceof ManagedLedgerException
                 || e instanceof BrokerPersistenceException
                 || e instanceof LookupException
                 || e instanceof ReachMaxPendingOpsException
-                || e instanceof ConnectException;
+                || e instanceof ConnectException)
+                && !(e instanceof 
ManagedLedgerException.ManagedLedgerFencedException);

Review comment:
       we should add some "isRetryable" method to ManagedLedgerException
   otherwise this code will be hard to be maintained 
   
   cc @merlimat 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1680,6 +1682,17 @@ public boolean isTopicNsOwnedByBroker(TopicName 
topicName) {
                                 : CompletableFuture.completedFuture(null)));
             }
         });
+        if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
+                && 
serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
+            TransactionMetadataStoreService metadataStoreService =
+                    this.getPulsar().getTransactionMetadataStoreService();
+            // if have store belongs to this bundle, remove and close the store

Review comment:
       typo "if the store" and not "if have store"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to