This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 9f0e8e677e8 [fix][broker] Topic could be in fenced state forever if 
deletion fails (#19129)
9f0e8e677e8 is described below

commit 9f0e8e677e8672c650aa2803bbea6eab2cb00ddb
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Jan 4 16:25:36 2023 +0100

    [fix][broker] Topic could be in fenced state forever if deletion fails 
(#19129)
    
    (cherry picked from commit a6516a8d19896316907c9904d7ed823e9282aef2)
---
 .../pulsar/broker/service/PersistentTopicTest.java | 71 +++++++++++++++++-----
 .../metadata/impl/FaultInjectionMetadataStore.java | 20 +++++-
 2 files changed, 74 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 24b056d3a56..db719b9e637 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -57,6 +56,7 @@ import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -127,19 +127,21 @@ import 
org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -157,7 +159,6 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
-    private MetadataStore store;
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
 
@@ -194,17 +195,15 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             return null;
         }).when(mlFactoryMock).asyncDelete(any(), any(), any());
         // Mock metaStore.
-        ZooKeeper mockZk = createMockZooKeeper();
         doReturn(createMockBookKeeper(executor))
             .when(pulsar).getBookKeeperClient();
         doReturn(executor).when(pulsar).getOrderedExecutor();
-        store = new ZKMetadataStore(mockZk);
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+        doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
+        doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
         // Mock pulsarResources.
-        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
-        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
-        TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, store);
+        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, 
metadataStore);
+        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
+        TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
         doReturn(nsr).when(pulsarResources).getNamespaceResources();
         doReturn(tsr).when(pulsarResources).getTopicResources();
         PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
@@ -258,6 +257,11 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         if (eventLoopGroup != null) {
             eventLoopGroup.shutdownGracefully().get();
         }
+        GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executor)
+                .handle().get();
+        EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
     }
 
     @Test
@@ -1598,6 +1602,45 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             }).when(cursorMock).asyncMarkDelete(any(), any(), 
any(MarkDeleteCallback.class), any());
     }
 
+
+    @Test
+    public void testDeleteTopicDeleteOnMetadataStoreFailed() throws Exception {
+
+        
doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();
+
+        // create topic
+        
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .createPartitionedTopic(TopicName.get(successTopicName), new 
PartitionedTopicMetadata(2));
+        PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
+
+        Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
+        isFencedField.setAccessible(true);
+        Field isClosingOrDeletingField = 
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
+        isClosingOrDeletingField.setAccessible(true);
+
+        assertFalse((boolean) isFencedField.get(topic));
+        assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
+        metadataStore.failConditional(new MetadataStoreException("injected 
error"), (op, path) -> {
+            if (op == FaultInjectionMetadataStore.OperationType.PUT
+                && 
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"))
 {
+                return true;
+            }
+            return false;
+        });
+        try {
+            topic.delete().get();
+            fail();
+        } catch (ExecutionException e) {
+            final Throwable t = FutureUtil.unwrapCompletionException(e);
+            assertTrue(t.getMessage().contains("injected error"));
+        }
+        assertFalse((boolean) isFencedField.get(topic));
+        assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
+    }
+
+
     @Test
     public void testFailoverSubscription() throws Exception {
         PersistentTopic topic1 = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
@@ -2313,8 +2356,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         topic.initialize();
         
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), 
Collections.emptyList());
 
-        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
-        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
+        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, 
metadataStore);
+        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
         doReturn(nsr).when(pulsarResources).getNamespaceResources();
         PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
             doReturn(pulsarResources).when(pulsar).getPulsarResources();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 4972570654c..16933288ff7 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import lombok.Data;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
@@ -39,6 +41,7 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 
 /**
  * Add possibility to inject failures during tests that interact with 
MetadataStore.
@@ -148,17 +151,28 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
 
     @Override
     public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, 
MetadataCacheConfig cacheConfig) {
-        return store.getMetadataCache(clazz, cacheConfig);
+        return 
injectMetadataStoreInMetadataCache(store.getMetadataCache(clazz, cacheConfig));
     }
 
     @Override
     public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, 
MetadataCacheConfig cacheConfig) {
-        return store.getMetadataCache(typeRef, cacheConfig);
+        return 
injectMetadataStoreInMetadataCache(store.getMetadataCache(typeRef, 
cacheConfig));
     }
 
     @Override
     public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, 
MetadataCacheConfig cacheConfig) {
-        return store.getMetadataCache(serde, cacheConfig);
+        return 
injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig));
+    }
+
+    @SneakyThrows
+    private <T> MetadataCache<T> 
injectMetadataStoreInMetadataCache(MetadataCache<T> metadataCache) {
+        if (metadataCache instanceof MetadataCacheImpl) {
+            FieldUtils.writeField(metadataCache, "store", this, true);
+        } else {
+            throw new UnsupportedOperationException("Metadata cache 
implementation "
+                    + metadataCache.getClass().getName() + " not supported by 
FaultInjectionMetadataStore");
+        }
+        return metadataCache;
     }
 
     @Override

Reply via email to