This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ee346693df [fix][flaky-test] Fix ClassCastException: BrokerService 
cannot be cast to class PulsarResources (#16821)
4ee346693df is described below

commit 4ee346693df8fc8314f94d53b00283f1c6079dc1
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jul 28 09:45:11 2022 +0800

    [fix][flaky-test] Fix ClassCastException: BrokerService cannot be cast to 
class PulsarResources (#16821)
---
 .../pulsar/broker/PulsarServiceMockSupport.java    | 75 ++++++++++++++++++++++
 .../broker/service/MessageCumulativeAckTest.java   |  9 ++-
 .../PersistentDispatcherFailoverConsumerTest.java  | 29 ++++++---
 .../pulsar/broker/service/PersistentTopicTest.java | 31 +++++----
 .../broker/service/ServerCnxAuthorizationTest.java | 16 +++--
 .../metadata/impl/AbstractMetadataStore.java       |  3 +-
 6 files changed, 135 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
new file mode 100644
index 00000000000..e871919d882
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.mockito.Mockito.mock;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarServiceMockSupport {
+
+    /**
+     * see: https://github.com/apache/pulsar/pull/16821.
+     * While executing 
"doReturn(pulsarResources).when(pulsar).getPulsarResources()", Meta Store 
Thread also accesses
+     * variable PulsarService.getPulsarResources() asynchronously in logic: 
"notification by zk-watcher".
+     * So execute mock-cmd in meta-thread (The executor of MetaStore is a 
single thread pool executor, so all things
+     * will be thread safety).
+     * Note: If the MetaStore's executor is no longer single-threaded, should 
mock as single-threaded if you need to
+     * execute this method.
+     */
+    public static void mockPulsarServiceProps(final PulsarService 
pulsarService, Runnable mockTask)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        final CompletableFuture<Void> mockGetPulsarResourceFuture = new 
CompletableFuture<>();
+        MetadataStoreExtended metadataStoreExtended = 
pulsarService.getLocalMetadataStore();
+        if (metadataStoreExtended instanceof AbstractMetadataStore){
+            AbstractMetadataStore abstractMetadataStore = 
(AbstractMetadataStore) metadataStoreExtended;
+            abstractMetadataStore.execute(() -> {
+                mockTask.run();
+                mockGetPulsarResourceFuture.complete(null);
+            }, mock(CompletableFuture.class));
+            try {
+                mockGetPulsarResourceFuture.get(1, TimeUnit.SECONDS);
+            } catch (TimeoutException timeoutException){
+                mockTask.run();
+            }
+        } else {
+            mockTask.run();
+        }
+    }
+
+    @Test
+    public void testMockMetaStore() throws Exception{
+        AtomicInteger integer = new AtomicInteger();
+        PulsarService pulsarService = Mockito.mock(PulsarService.class);
+        
Mockito.when(pulsarService.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
+        mockPulsarServiceProps(pulsarService, () -> integer.incrementAndGet());
+        Assert.assertEquals(integer.get(), 1);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 86754efc0c2..05e86e73038 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -94,7 +95,9 @@ public class MessageCumulativeAckTest {
         doReturn(store).when(pulsar).getConfigurationMetadataStore();
 
         PulsarResources pulsarResources = new PulsarResources(store, store);
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
 
         serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
         doReturn(true).when(serverCnx).isActive();
@@ -107,7 +110,9 @@ public class MessageCumulativeAckTest {
 
         eventLoopGroup = new NioEventLoopGroup();
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        doReturn(brokerService).when(pulsar).getBrokerService();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+        });
 
         String topicName = 
TopicName.get("MessageCumulativeAckTest").toString();
         PersistentTopic persistentTopic = new PersistentTopic(topicName, 
mock(ManagedLedger.class), brokerService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index ec4ed02e35d..0d2e5a9ee02 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -65,6 +65,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.PulsarResources;
@@ -124,24 +125,32 @@ public class PersistentDispatcherFailoverConsumerTest {
         svcConfig.setSystemTopicEnabled(false);
         svcConfig.setTopicLevelPoliciesEnabled(false);
         pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
+        store = MetadataStoreFactory.create("memory:local", 
MetadataStoreConfig.builder().build());
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(svcConfig).when(pulsar).getConfiguration();
+        });
 
         mlFactoryMock = mock(ManagedLedgerFactory.class);
-        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        });
 
         doReturn(TransactionTestBase.createMockBookKeeper(executor))
                 .when(pulsar).getBookKeeperClient();
         eventLoopGroup = new NioEventLoopGroup();
 
-        store = MetadataStoreFactory.create("memory:local", 
MetadataStoreConfig.builder().build());
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
         PulsarResources pulsarResources = new PulsarResources(store, store);
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
 
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        doReturn(brokerService).when(pulsar).getBrokerService();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+        });
 
         consumerChanges = new LinkedBlockingQueue<>();
         this.channelCtx = mock(ChannelHandlerContext.class);
@@ -187,7 +196,9 @@ public class PersistentDispatcherFailoverConsumerTest {
                 .when(serverCnxWithOldVersion).getCommandSender();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
-        doReturn(nsSvc).when(pulsar).getNamespaceService();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(nsSvc).when(pulsar).getNamespaceService();
+        });
         
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
         doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 840810ecc1b..864a4e52ee2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -99,6 +99,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -205,27 +206,29 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             deleteLedgerCallback.deleteLedgerComplete(null);
             return null;
         }).when(mlFactoryMock).asyncDelete(any(), any(), any());
-
+        // Mock metaStore.
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(createMockBookKeeper(executor))
             .when(pulsar).getBookKeeperClient();
-
         doReturn(executor).when(pulsar).getOrderedExecutor();
-
         store = new ZKMetadataStore(mockZk);
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+        // Mock pulsarResources.
         PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
         NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
         TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, store);
         doReturn(nsr).when(pulsarResources).getNamespaceResources();
         doReturn(tsr).when(pulsarResources).getTopicResources();
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
-
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
+        // Mock brokerService.
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        doReturn(brokerService).when(pulsar).getBrokerService();
-
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+        });
+        // Mock serviceCnx.
         serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
@@ -241,7 +244,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         NamespaceService nsSvc = mock(NamespaceService.class);
         NamespaceBundle bundle = mock(NamespaceBundle.class);
         
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
-        doReturn(nsSvc).when(pulsar).getNamespaceService();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(nsSvc).when(pulsar).getNamespaceService();
+        });
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         doReturn(true).when(nsSvc).isServiceUnitActive(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
@@ -2318,7 +2323,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
         NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
         doReturn(nsr).when(pulsarResources).getNamespaceResources();
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
         CompletableFuture<Optional<Policies>> policiesFuture = new 
CompletableFuture<>();
         Policies policies = new Policies();
         Set<String> namespaceClusters = new HashSet<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
index af539891711..6d108ce675d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
@@ -116,7 +117,10 @@ public class ServerCnxAuthorizationTest {
         doReturn(new 
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
 
         doReturn(svcConfig).when(pulsar).getConfiguration();
-        
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+        });
+
 
         ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
         doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
@@ -132,7 +136,9 @@ public class ServerCnxAuthorizationTest {
         doReturn(store).when(pulsar).getConfigurationMetadataStore();
 
         pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
         NamespaceResources namespaceResources =
                 spyWithClassAndConstructorArgs(NamespaceResources.class, 
store, store, 30);
         
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
@@ -146,8 +152,10 @@ public class ServerCnxAuthorizationTest {
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
         BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
         doReturn(interceptor).when(brokerService).getInterceptor();
-        doReturn(brokerService).when(pulsar).getBrokerService();
-        doReturn(executor).when(pulsar).getOrderedExecutor();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+            doReturn(executor).when(pulsar).getOrderedExecutor();
+        });
     }
 
     @Test
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index cac386405a6..a116ea9c294 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -316,7 +316,8 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     /**
      * Run the task in the executor thread and fail the future if the executor 
is shutting down.
      */
-    protected void execute(Runnable task, CompletableFuture<?> future) {
+    @VisibleForTesting
+    public void execute(Runnable task, CompletableFuture<?> future) {
         try {
             executor.execute(task);
         } catch (Throwable t) {

Reply via email to