This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 28505e4  Fix bug where producer for geo-replication is not closed when 
topic is unloaded (#7735)
28505e4 is described below

commit 28505e40906ddbfba8b336546e262541bcc494bb
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Aug 6 18:55:12 2020 +0900

    Fix bug where producer for geo-replication is not closed when topic is 
unloaded (#7735)
    
    ### Motivation
    
    When a topic is unloaded and moved to another broker, the producer for 
geo-replication often remains unclosed. Because of this, geo-replication is not 
possible on the broker to which the topic was moved and messages accumulate in 
the replication backlog.
    
    ```
    18:56:55.166 [pulsar-io-21-6] ERROR o.a.pulsar.client.impl.ProducerImpl  - 
[persistent://xxx/yyy/zzz] [pulsar.repl.dc2] Failed to create producer: 
Producer with name 'pulsar.repl.dc2' is already connected to topic
    ```
    
    When this issue occurs, the following log is output on the broker where the 
topic is unloaded.
    
    ```
    17:14:36.424 [bookkeeper-ml-workers-OrderedExecutor-18-0] INFO  
o.a.p.b.s.persistent.PersistentTopic - [persistent://xxx/yyy/zzz] Un-fencing 
topic...
    ```
    
    Unloaded topics are usually fenced to prevent new clients from connecting. 
In this case, however, the producers reconnected to the topic because it had 
been unfenced, and the replicator was restarted.
    
    I think this is due to https://github.com/apache/pulsar/pull/5271. If a 
topic is fenced to close or delete, we should not unfence it.
    
    ### Modifications
    
    When closing or deleting the `PersistentTopic` instance, set the 
`isClosingOrDeleting` flag to true. If `isClosingOrDeleting` is true, do not 
unfence the topic unless closing or deleting fails.
---
 .../broker/service/persistent/PersistentTopic.java | 35 +++++++----
 .../pulsar/broker/service/PersistentTopicTest.java | 68 +++++++++++++++++++++-
 2 files changed, 89 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 767b236..e6120f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,6 +184,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private volatile double lastUpdatedAvgPublishRateInByte = 0;
 
     public volatile int maxUnackedMessagesOnSubscription = -1;
+    private volatile boolean isClosingOrDeleting = false;
 
     private static class TopicStatsHelper {
         public double averageMsgSize;
@@ -346,9 +347,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     private void decrementPendingWriteOpsAndCheck() {
         long pending = pendingWriteOps.decrementAndGet();
-        if (pending == 0 && isFenced) {
+        if (pending == 0 && isFenced && !isClosingOrDeleting) {
             synchronized (this) {
-                if (isFenced) {
+                if (isFenced && !isClosingOrDeleting) {
                     messageDeduplication.resetHighestSequenceIdPushed();
                     log.info("[{}] Un-fencing topic...", topic);
                     // signal to managed ledger that we are ready to resume by 
creating a new ledger
@@ -844,7 +845,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         lock.writeLock().lock();
         try {
-            if (isFenced) {
+            if (isClosingOrDeleting) {
                 log.warn("[{}] Topic is already being closed or deleted", 
topic);
                 return FutureUtil.failedFuture(new TopicFencedException("Topic 
is already fenced"));
             } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
@@ -853,7 +854,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 return FutureUtil.failedFuture(new TopicBusyException("Topic 
has subscriptions did not catch up"));
             }
 
-            isFenced = true; // Avoid clients reconnections while deleting
+            fenceTopicToCloseOrDelete(); // Avoid clients reconnections while 
deleting
             CompletableFuture<Void> closeClientFuture = new 
CompletableFuture<>();
             if (closeIfClientsConnected) {
                 List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -864,7 +865,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     closeClientFuture.complete(null);
                 }).exceptionally(ex -> {
                     log.error("[{}] Error closing clients", topic, ex);
-                    isFenced = false;
+                    unfenceTopicToResume();
                     closeClientFuture.completeExceptionally(ex);
                     return null;
                 });
@@ -885,7 +886,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     deleteSchemaFuture.whenComplete((v, ex) -> {
                         if (ex != null) {
                             log.error("[{}] Error deleting topic", topic, ex);
-                            isFenced = false;
+                            unfenceTopicToResume();
                             deleteFuture.completeExceptionally(ex);
                         } else {
                             ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
@@ -907,7 +908,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                         log.info("[{}] Topic is already 
deleted {}", topic, exception.getMessage());
                                         deleteLedgerComplete(ctx);
                                     } else {
-                                        isFenced = false;
+                                        unfenceTopicToResume();
                                         log.error("[{}] Error deleting topic", 
topic, exception);
                                         deleteFuture.completeExceptionally(new 
PersistenceException(exception));
                                     }
@@ -916,12 +917,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         }
                     });
                 } else {
-                    isFenced = false;
+                    unfenceTopicToResume();
                     deleteFuture.completeExceptionally(new TopicBusyException(
                             "Topic has " + USAGE_COUNT_UPDATER.get(this) + " 
connected producers/consumers"));
                 }
             }).exceptionally(ex->{
-                isFenced = false;
+                unfenceTopicToResume();
                 deleteFuture.completeExceptionally(
                         new TopicBusyException("Failed to close clients before 
deleting topic."));
                 return null;
@@ -951,8 +952,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         try {
             // closing managed-ledger waits until all 
producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all 
resources to be closed.
-            if (!isFenced || closeWithoutWaitingClientDisconnect) {
-                isFenced = true;
+            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
+                fenceTopicToCloseOrDelete();
             } else {
                 log.warn("[{}] Topic is already being closed or deleted", 
topic);
                 closeFuture.completeExceptionally(new 
TopicFencedException("Topic is already fenced"));
@@ -998,7 +999,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             }, null);
         }).exceptionally(exception -> {
             log.error("[{}] Error closing topic", topic, exception);
-            isFenced = false;
+            unfenceTopicToResume();
             closeFuture.completeExceptionally(exception);
             return null;
         });
@@ -2210,4 +2211,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public boolean isSystemTopic() {
         return false;
     }
+
+    private void fenceTopicToCloseOrDelete() {
+        isClosingOrDeleting = true;
+        isFenced = true;
+    }
+
+    private void unfenceTopicToResume() {
+        isFenced = false;
+        isClosingOrDeleting = false;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index ae0bd4d..b152d16 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -869,25 +869,79 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testCloseTopic() throws Exception {
+        // create topic
+        PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
+
+        Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
+        isFencedField.setAccessible(true);
+        Field isClosingOrDeletingField = 
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
+        isClosingOrDeletingField.setAccessible(true);
+
+        assertFalse((boolean) isFencedField.get(topic));
+        assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
+        // 1. close topic
+        topic.close().get();
+        
assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
+        assertTrue((boolean) isFencedField.get(topic));
+        assertTrue((boolean) isClosingOrDeletingField.get(topic));
+
+        // 2. publish message to closed topic
+        ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
+        final CountDownLatch latch = new CountDownLatch(1);
+        topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
+            assertTrue(exception instanceof 
BrokerServiceException.TopicFencedException);
+            latch.countDown();
+        });
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+        assertTrue((boolean) isFencedField.get(topic));
+        assertTrue((boolean) isClosingOrDeletingField.get(topic));
+    }
+
+    @Test
     public void testDeleteTopic() throws Exception {
         // create topic
         PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
 
+        Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
+        isFencedField.setAccessible(true);
+        Field isClosingOrDeletingField = 
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
+        isClosingOrDeletingField.setAccessible(true);
+
+        assertFalse((boolean) isFencedField.get(topic));
+        assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
         String role = "appid1";
         // 1. delete inactive topic
         topic.delete().get();
         
assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
+        assertTrue((boolean) isFencedField.get(topic));
+        assertTrue((boolean) isClosingOrDeletingField.get(topic));
 
-        // 2. delete topic with producer
+        // 2. publish message to deleted topic
+        ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
+        final CountDownLatch latch = new CountDownLatch(1);
+        topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
+            assertTrue(exception instanceof 
BrokerServiceException.TopicFencedException);
+            latch.countDown();
+        });
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+        assertTrue((boolean) isFencedField.get(topic));
+        assertTrue((boolean) isClosingOrDeletingField.get(topic));
+
+        // 3. delete topic with producer
         topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name",
                 role, false, null, SchemaVersion.Latest, 0, false);
         topic.addProducer(producer);
 
         assertTrue(topic.delete().isCompletedExceptionally());
+        assertFalse((boolean) isFencedField.get(topic));
+        assertFalse((boolean) isClosingOrDeletingField.get(topic));
         topic.removeProducer(producer);
 
-        // 3. delete topic with subscriber
+        // 4. delete topic with subscriber
         CommandSubscribe cmd = 
CommandSubscribe.newBuilder().setConsumerId(1).setTopic(successTopicName)
                 
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
@@ -897,6 +951,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         f1.get();
 
         assertTrue(topic.delete().isCompletedExceptionally());
+        assertFalse((boolean) isFencedField.get(topic));
+        assertFalse((boolean) isClosingOrDeletingField.get(topic));
         topic.unsubscribe(successSubName);
     }
 
@@ -1146,6 +1202,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(Map.class),
                 any(OpenCursorCallback.class), any());
 
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                ((CloseCallback) 
invocationOnMock.getArguments()[0]).closeComplete(null);
+                return null;
+            }
+        }).when(ledgerMock).asyncClose(any(CloseCallback.class), any());
+
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
             @Override

Reply via email to