This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 9f0e8e677e8 [fix][broker] Topic could be in fenced state forever if
deletion fails (#19129)
9f0e8e677e8 is described below
commit 9f0e8e677e8672c650aa2803bbea6eab2cb00ddb
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Jan 4 16:25:36 2023 +0100
[fix][broker] Topic could be in fenced state forever if deletion fails
(#19129)
(cherry picked from commit a6516a8d19896316907c9904d7ed823e9282aef2)
---
.../pulsar/broker/service/PersistentTopicTest.java | 71 +++++++++++++++++-----
.../metadata/impl/FaultInjectionMetadataStore.java | 20 +++++-
2 files changed, 74 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 24b056d3a56..db719b9e637 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -57,6 +56,7 @@ import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -127,19 +127,21 @@ import
org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -157,7 +159,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
- private MetadataStore store;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
@@ -194,17 +195,15 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());
// Mock metaStore.
- ZooKeeper mockZk = createMockZooKeeper();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
doReturn(executor).when(pulsar).getOrderedExecutor();
- store = new ZKMetadataStore(mockZk);
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
+ doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
+ doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
// Mock pulsarResources.
- PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
- NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
- TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, store);
+ PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore,
metadataStore);
+ NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
+ TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
doReturn(tsr).when(pulsarResources).getTopicResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
@@ -258,6 +257,11 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().get();
}
+ GracefulExecutorServicesShutdown.initiate()
+ .timeout(Duration.ZERO)
+ .shutdown(executor)
+ .handle().get();
+ EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
}
@Test
@@ -1598,6 +1602,45 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}).when(cursorMock).asyncMarkDelete(any(), any(),
any(MarkDeleteCallback.class), any());
}
+
+ @Test
+ public void testDeleteTopicDeleteOnMetadataStoreFailed() throws Exception {
+
+
doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();
+
+ // create topic
+
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .createPartitionedTopic(TopicName.get(successTopicName), new
PartitionedTopicMetadata(2));
+ PersistentTopic topic = (PersistentTopic)
brokerService.getOrCreateTopic(successTopicName).get();
+
+ Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
+ isFencedField.setAccessible(true);
+ Field isClosingOrDeletingField =
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
+ isClosingOrDeletingField.setAccessible(true);
+
+ assertFalse((boolean) isFencedField.get(topic));
+ assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
+ metadataStore.failConditional(new MetadataStoreException("injected
error"), (op, path) -> {
+ if (op == FaultInjectionMetadataStore.OperationType.PUT
+ &&
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"))
{
+ return true;
+ }
+ return false;
+ });
+ try {
+ topic.delete().get();
+ fail();
+ } catch (ExecutionException e) {
+ final Throwable t = FutureUtil.unwrapCompletionException(e);
+ assertTrue(t.getMessage().contains("injected error"));
+ }
+ assertFalse((boolean) isFencedField.get(topic));
+ assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
+ }
+
+
@Test
public void testFailoverSubscription() throws Exception {
PersistentTopic topic1 = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
@@ -2313,8 +2356,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(),
Collections.emptyList());
- PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
- NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
+ PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore,
metadataStore);
+ NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 4972570654c..16933288ff7 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import lombok.Data;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
@@ -39,6 +41,7 @@ import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
/**
* Add possibility to inject failures during tests that interact with
MetadataStore.
@@ -148,17 +151,28 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
@Override
public <T> MetadataCache<T> getMetadataCache(Class<T> clazz,
MetadataCacheConfig cacheConfig) {
- return store.getMetadataCache(clazz, cacheConfig);
+ return
injectMetadataStoreInMetadataCache(store.getMetadataCache(clazz, cacheConfig));
}
@Override
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef,
MetadataCacheConfig cacheConfig) {
- return store.getMetadataCache(typeRef, cacheConfig);
+ return
injectMetadataStoreInMetadataCache(store.getMetadataCache(typeRef,
cacheConfig));
}
@Override
public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde,
MetadataCacheConfig cacheConfig) {
- return store.getMetadataCache(serde, cacheConfig);
+ return
injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig));
+ }
+
+ @SneakyThrows
+ private <T> MetadataCache<T>
injectMetadataStoreInMetadataCache(MetadataCache<T> metadataCache) {
+ if (metadataCache instanceof MetadataCacheImpl) {
+ FieldUtils.writeField(metadataCache, "store", this, true);
+ } else {
+ throw new UnsupportedOperationException("Metadata cache
implementation "
+ + metadataCache.getClass().getName() + " not supported by
FaultInjectionMetadataStore");
+ }
+ return metadataCache;
}
@Override