This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 28505e4 Fix bug where producer for geo-replication is not closed when
topic is unloaded (#7735)
28505e4 is described below
commit 28505e40906ddbfba8b336546e262541bcc494bb
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Aug 6 18:55:12 2020 +0900
Fix bug where producer for geo-replication is not closed when topic is
unloaded (#7735)
### Motivation
When a topic is unloaded and moved to another broker, the producer for
geo-replication often remains unclosed. Because of this, geo-replication is not
possible on the broker to which the topic was moved and messages accumulate in
the replication backlog.
```
18:56:55.166 [pulsar-io-21-6] ERROR o.a.pulsar.client.impl.ProducerImpl -
[persistent://xxx/yyy/zzz] [pulsar.repl.dc2] Failed to create producer:
Producer with name 'pulsar.repl.dc2' is already connected to topic
```
When this issue occurs, the following log is output on the broker where the
topic is unloaded.
```
17:14:36.424 [bookkeeper-ml-workers-OrderedExecutor-18-0] INFO
o.a.p.b.s.persistent.PersistentTopic - [persistent://xxx/yyy/zzz] Un-fencing
topic...
```
Unloaded topics are usually fenced to prevent new clients from connecting.
In this case, however, the producers reconnected to the topic because it had
been unfenced, and the replicator was restarted.
I think this is due to https://github.com/apache/pulsar/pull/5271. If a
topic is fenced to close or delete, we should not unfence it.
### Modifications
When closing or deleting the `PersistentTopic` instance, set the
`isClosingOrDeleting` flag to true. If `isClosingOrDeleting` is true, do not
unfence the topic unless closing or deleting fails.
---
.../broker/service/persistent/PersistentTopic.java | 35 +++++++----
.../pulsar/broker/service/PersistentTopicTest.java | 68 +++++++++++++++++++++-
2 files changed, 89 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 767b236..e6120f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,6 +184,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private volatile double lastUpdatedAvgPublishRateInByte = 0;
public volatile int maxUnackedMessagesOnSubscription = -1;
+ private volatile boolean isClosingOrDeleting = false;
private static class TopicStatsHelper {
public double averageMsgSize;
@@ -346,9 +347,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private void decrementPendingWriteOpsAndCheck() {
long pending = pendingWriteOps.decrementAndGet();
- if (pending == 0 && isFenced) {
+ if (pending == 0 && isFenced && !isClosingOrDeleting) {
synchronized (this) {
- if (isFenced) {
+ if (isFenced && !isClosingOrDeleting) {
messageDeduplication.resetHighestSequenceIdPushed();
log.info("[{}] Un-fencing topic...", topic);
// signal to managed ledger that we are ready to resume by
creating a new ledger
@@ -844,7 +845,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
lock.writeLock().lock();
try {
- if (isFenced) {
+ if (isClosingOrDeleting) {
log.warn("[{}] Topic is already being closed or deleted",
topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic
is already fenced"));
} else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
@@ -853,7 +854,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return FutureUtil.failedFuture(new TopicBusyException("Topic
has subscriptions did not catch up"));
}
- isFenced = true; // Avoid clients reconnections while deleting
+ fenceTopicToCloseOrDelete(); // Avoid clients reconnections while
deleting
CompletableFuture<Void> closeClientFuture = new
CompletableFuture<>();
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -864,7 +865,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
- isFenced = false;
+ unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
});
@@ -885,7 +886,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
deleteSchemaFuture.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
- isFenced = false;
+ unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
@@ -907,7 +908,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
log.info("[{}] Topic is already
deleted {}", topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
- isFenced = false;
+ unfenceTopicToResume();
log.error("[{}] Error deleting topic",
topic, exception);
deleteFuture.completeExceptionally(new
PersistenceException(exception));
}
@@ -916,12 +917,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
});
} else {
- isFenced = false;
+ unfenceTopicToResume();
deleteFuture.completeExceptionally(new TopicBusyException(
"Topic has " + USAGE_COUNT_UPDATER.get(this) + "
connected producers/consumers"));
}
}).exceptionally(ex->{
- isFenced = false;
+ unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before
deleting topic."));
return null;
@@ -951,8 +952,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
try {
// closing managed-ledger waits until all
producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all
resources to be closed.
- if (!isFenced || closeWithoutWaitingClientDisconnect) {
- isFenced = true;
+ if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
+ fenceTopicToCloseOrDelete();
} else {
log.warn("[{}] Topic is already being closed or deleted",
topic);
closeFuture.completeExceptionally(new
TopicFencedException("Topic is already fenced"));
@@ -998,7 +999,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}, null);
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
- isFenced = false;
+ unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
return null;
});
@@ -2210,4 +2211,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
public boolean isSystemTopic() {
return false;
}
+
+ private void fenceTopicToCloseOrDelete() {
+ isClosingOrDeleting = true;
+ isFenced = true;
+ }
+
+ private void unfenceTopicToResume() {
+ isFenced = false;
+ isClosingOrDeleting = false;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index ae0bd4d..b152d16 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -869,25 +869,79 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
+ public void testCloseTopic() throws Exception {
+ // create topic
+ PersistentTopic topic = (PersistentTopic)
brokerService.getOrCreateTopic(successTopicName).get();
+
+ Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
+ isFencedField.setAccessible(true);
+ Field isClosingOrDeletingField =
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
+ isClosingOrDeletingField.setAccessible(true);
+
+ assertFalse((boolean) isFencedField.get(topic));
+ assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
+ // 1. close topic
+ topic.close().get();
+
assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
+ assertTrue((boolean) isFencedField.get(topic));
+ assertTrue((boolean) isClosingOrDeletingField.get(topic));
+
+ // 2. publish message to closed topic
+ ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
+ final CountDownLatch latch = new CountDownLatch(1);
+ topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
+ assertTrue(exception instanceof
BrokerServiceException.TopicFencedException);
+ latch.countDown();
+ });
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertTrue((boolean) isFencedField.get(topic));
+ assertTrue((boolean) isClosingOrDeletingField.get(topic));
+ }
+
+ @Test
public void testDeleteTopic() throws Exception {
// create topic
PersistentTopic topic = (PersistentTopic)
brokerService.getOrCreateTopic(successTopicName).get();
+ Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
+ isFencedField.setAccessible(true);
+ Field isClosingOrDeletingField =
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
+ isClosingOrDeletingField.setAccessible(true);
+
+ assertFalse((boolean) isFencedField.get(topic));
+ assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
String role = "appid1";
// 1. delete inactive topic
topic.delete().get();
assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
+ assertTrue((boolean) isFencedField.get(topic));
+ assertTrue((boolean) isClosingOrDeletingField.get(topic));
- // 2. delete topic with producer
+ // 2. publish message to deleted topic
+ ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
+ final CountDownLatch latch = new CountDownLatch(1);
+ topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
+ assertTrue(exception instanceof
BrokerServiceException.TopicFencedException);
+ latch.countDown();
+ });
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertTrue((boolean) isFencedField.get(topic));
+ assertTrue((boolean) isClosingOrDeletingField.get(topic));
+
+ // 3. delete topic with producer
topic = (PersistentTopic)
brokerService.getOrCreateTopic(successTopicName).get();
Producer producer = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false);
topic.addProducer(producer);
assertTrue(topic.delete().isCompletedExceptionally());
+ assertFalse((boolean) isFencedField.get(topic));
+ assertFalse((boolean) isClosingOrDeletingField.get(topic));
topic.removeProducer(producer);
- // 3. delete topic with subscriber
+ // 4. delete topic with subscriber
CommandSubscribe cmd =
CommandSubscribe.newBuilder().setConsumerId(1).setTopic(successTopicName)
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
@@ -897,6 +951,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
f1.get();
assertTrue(topic.delete().isCompletedExceptionally());
+ assertFalse((boolean) isFencedField.get(topic));
+ assertFalse((boolean) isClosingOrDeletingField.get(topic));
topic.unsubscribe(successSubName);
}
@@ -1146,6 +1202,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(Map.class),
any(OpenCursorCallback.class), any());
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ ((CloseCallback)
invocationOnMock.getArguments()[0]).closeComplete(null);
+ return null;
+ }
+ }).when(ledgerMock).asyncClose(any(CloseCallback.class), any());
+
// call deleteLedgerComplete on ledger asyncDelete
doAnswer(new Answer<Object>() {
@Override