This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 660ac36c620 Revert "[fix][broker] Topic could be in fenced state
forever if deletion fails (#19129)"
660ac36c620 is described below
commit 660ac36c620f80c301ed4cd236c7cd3398cbae90
Author: xiangying <[email protected]>
AuthorDate: Wed Feb 8 16:49:17 2023 +0800
Revert "[fix][broker] Topic could be in fenced state forever if deletion
fails (#19129)"
This reverts commit 9f0e8e677e8672c650aa2803bbea6eab2cb00ddb.
---
.../pulsar/broker/service/PersistentTopicTest.java | 71 +++++-----------------
.../metadata/impl/FaultInjectionMetadataStore.java | 20 +-----
2 files changed, 17 insertions(+), 74 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index db719b9e637..24b056d3a56 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -56,7 +57,6 @@ import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -127,21 +127,19 @@ import
org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -159,6 +157,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
+ private MetadataStore store;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
@@ -195,15 +194,17 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());
// Mock metaStore.
+ ZooKeeper mockZk = createMockZooKeeper();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
doReturn(executor).when(pulsar).getOrderedExecutor();
- doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
- doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
+ store = new ZKMetadataStore(mockZk);
+ doReturn(store).when(pulsar).getLocalMetadataStore();
+ doReturn(store).when(pulsar).getConfigurationMetadataStore();
// Mock pulsarResources.
- PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore,
metadataStore);
- NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
- TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
+ PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
+ NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
+ TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, store);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
doReturn(tsr).when(pulsarResources).getTopicResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
@@ -257,11 +258,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().get();
}
- GracefulExecutorServicesShutdown.initiate()
- .timeout(Duration.ZERO)
- .shutdown(executor)
- .handle().get();
- EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
}
@Test
@@ -1602,45 +1598,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}).when(cursorMock).asyncMarkDelete(any(), any(),
any(MarkDeleteCallback.class), any());
}
-
- @Test
- public void testDeleteTopicDeleteOnMetadataStoreFailed() throws Exception {
-
-
doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();
-
- // create topic
-
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
- .createPartitionedTopic(TopicName.get(successTopicName), new
PartitionedTopicMetadata(2));
- PersistentTopic topic = (PersistentTopic)
brokerService.getOrCreateTopic(successTopicName).get();
-
- Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
- isFencedField.setAccessible(true);
- Field isClosingOrDeletingField =
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
- isClosingOrDeletingField.setAccessible(true);
-
- assertFalse((boolean) isFencedField.get(topic));
- assertFalse((boolean) isClosingOrDeletingField.get(topic));
-
- metadataStore.failConditional(new MetadataStoreException("injected
error"), (op, path) -> {
- if (op == FaultInjectionMetadataStore.OperationType.PUT
- &&
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"))
{
- return true;
- }
- return false;
- });
- try {
- topic.delete().get();
- fail();
- } catch (ExecutionException e) {
- final Throwable t = FutureUtil.unwrapCompletionException(e);
- assertTrue(t.getMessage().contains("injected error"));
- }
- assertFalse((boolean) isFencedField.get(topic));
- assertFalse((boolean) isClosingOrDeletingField.get(topic));
-
- }
-
-
@Test
public void testFailoverSubscription() throws Exception {
PersistentTopic topic1 = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
@@ -2356,8 +2313,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(),
Collections.emptyList());
- PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore,
metadataStore);
- NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
+ PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
+ NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 16933288ff7..4972570654c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import lombok.Data;
-import lombok.SneakyThrows;
-import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
@@ -41,7 +39,6 @@ import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
-import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
/**
* Add possibility to inject failures during tests that interact with
MetadataStore.
@@ -151,28 +148,17 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
@Override
public <T> MetadataCache<T> getMetadataCache(Class<T> clazz,
MetadataCacheConfig cacheConfig) {
- return
injectMetadataStoreInMetadataCache(store.getMetadataCache(clazz, cacheConfig));
+ return store.getMetadataCache(clazz, cacheConfig);
}
@Override
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef,
MetadataCacheConfig cacheConfig) {
- return
injectMetadataStoreInMetadataCache(store.getMetadataCache(typeRef,
cacheConfig));
+ return store.getMetadataCache(typeRef, cacheConfig);
}
@Override
public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde,
MetadataCacheConfig cacheConfig) {
- return
injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig));
- }
-
- @SneakyThrows
- private <T> MetadataCache<T>
injectMetadataStoreInMetadataCache(MetadataCache<T> metadataCache) {
- if (metadataCache instanceof MetadataCacheImpl) {
- FieldUtils.writeField(metadataCache, "store", this, true);
- } else {
- throw new UnsupportedOperationException("Metadata cache
implementation "
- + metadataCache.getClass().getName() + " not supported by
FaultInjectionMetadataStore");
- }
- return metadataCache;
+ return store.getMetadataCache(serde, cacheConfig);
}
@Override