This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 660ac36c620 Revert "[fix][broker] Topic could be in fenced state 
forever if deletion fails (#19129)"
660ac36c620 is described below

commit 660ac36c620f80c301ed4cd236c7cd3398cbae90
Author: xiangying <[email protected]>
AuthorDate: Wed Feb 8 16:49:17 2023 +0800

    Revert "[fix][broker] Topic could be in fenced state forever if deletion 
fails (#19129)"
    
    This reverts commit 9f0e8e677e8672c650aa2803bbea6eab2cb00ddb.
---
 .../pulsar/broker/service/PersistentTopicTest.java | 71 +++++-----------------
 .../metadata/impl/FaultInjectionMetadataStore.java | 20 +-----
 2 files changed, 17 insertions(+), 74 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index db719b9e637..24b056d3a56 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -56,7 +57,6 @@ import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -127,21 +127,19 @@ import 
org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.ZooKeeper;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -159,6 +157,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
+    private MetadataStore store;
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
 
@@ -195,15 +194,17 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             return null;
         }).when(mlFactoryMock).asyncDelete(any(), any(), any());
         // Mock metaStore.
+        ZooKeeper mockZk = createMockZooKeeper();
         doReturn(createMockBookKeeper(executor))
             .when(pulsar).getBookKeeperClient();
         doReturn(executor).when(pulsar).getOrderedExecutor();
-        doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
-        doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
+        store = new ZKMetadataStore(mockZk);
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
         // Mock pulsarResources.
-        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, 
metadataStore);
-        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
-        TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
+        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
+        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
+        TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, store);
         doReturn(nsr).when(pulsarResources).getNamespaceResources();
         doReturn(tsr).when(pulsarResources).getTopicResources();
         PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
@@ -257,11 +258,6 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         if (eventLoopGroup != null) {
             eventLoopGroup.shutdownGracefully().get();
         }
-        GracefulExecutorServicesShutdown.initiate()
-                .timeout(Duration.ZERO)
-                .shutdown(executor)
-                .handle().get();
-        EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
     }
 
     @Test
@@ -1602,45 +1598,6 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             }).when(cursorMock).asyncMarkDelete(any(), any(), 
any(MarkDeleteCallback.class), any());
     }
 
-
-    @Test
-    public void testDeleteTopicDeleteOnMetadataStoreFailed() throws Exception {
-
-        
doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();
-
-        // create topic
-        
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                .createPartitionedTopic(TopicName.get(successTopicName), new 
PartitionedTopicMetadata(2));
-        PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
-
-        Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
-        isFencedField.setAccessible(true);
-        Field isClosingOrDeletingField = 
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
-        isClosingOrDeletingField.setAccessible(true);
-
-        assertFalse((boolean) isFencedField.get(topic));
-        assertFalse((boolean) isClosingOrDeletingField.get(topic));
-
-        metadataStore.failConditional(new MetadataStoreException("injected 
error"), (op, path) -> {
-            if (op == FaultInjectionMetadataStore.OperationType.PUT
-                && 
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"))
 {
-                return true;
-            }
-            return false;
-        });
-        try {
-            topic.delete().get();
-            fail();
-        } catch (ExecutionException e) {
-            final Throwable t = FutureUtil.unwrapCompletionException(e);
-            assertTrue(t.getMessage().contains("injected error"));
-        }
-        assertFalse((boolean) isFencedField.get(topic));
-        assertFalse((boolean) isClosingOrDeletingField.get(topic));
-
-    }
-
-
     @Test
     public void testFailoverSubscription() throws Exception {
         PersistentTopic topic1 = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
@@ -2356,8 +2313,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         topic.initialize();
         
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), 
Collections.emptyList());
 
-        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, 
metadataStore);
-        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
+        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
+        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
         doReturn(nsr).when(pulsarResources).getNamespaceResources();
         PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
             doReturn(pulsarResources).when(pulsar).getPulsarResources();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 16933288ff7..4972570654c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import lombok.Data;
-import lombok.SneakyThrows;
-import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
@@ -41,7 +39,6 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
-import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 
 /**
  * Add possibility to inject failures during tests that interact with 
MetadataStore.
@@ -151,28 +148,17 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
 
     @Override
     public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, 
MetadataCacheConfig cacheConfig) {
-        return 
injectMetadataStoreInMetadataCache(store.getMetadataCache(clazz, cacheConfig));
+        return store.getMetadataCache(clazz, cacheConfig);
     }
 
     @Override
     public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, 
MetadataCacheConfig cacheConfig) {
-        return 
injectMetadataStoreInMetadataCache(store.getMetadataCache(typeRef, 
cacheConfig));
+        return store.getMetadataCache(typeRef, cacheConfig);
     }
 
     @Override
     public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, 
MetadataCacheConfig cacheConfig) {
-        return 
injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig));
-    }
-
-    @SneakyThrows
-    private <T> MetadataCache<T> 
injectMetadataStoreInMetadataCache(MetadataCache<T> metadataCache) {
-        if (metadataCache instanceof MetadataCacheImpl) {
-            FieldUtils.writeField(metadataCache, "store", this, true);
-        } else {
-            throw new UnsupportedOperationException("Metadata cache 
implementation "
-                    + metadataCache.getClass().getName() + " not supported by 
FaultInjectionMetadataStore");
-        }
-        return metadataCache;
+        return store.getMetadataCache(serde, cacheConfig);
     }
 
     @Override

Reply via email to