This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new b4ce1b79918 [fix][flaky-test] Fix ClassCastException: BrokerService 
cannot be cast to class PulsarResources (#16821)
b4ce1b79918 is described below

commit b4ce1b799180f4a2ef8cdb29608cea3fb6d59b0c
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jul 28 09:45:11 2022 +0800

    [fix][flaky-test] Fix ClassCastException: BrokerService cannot be cast to 
class PulsarResources (#16821)
    
    (cherry picked from commit 4ee346693df8fc8314f94d53b00283f1c6079dc1)
---
 .../pulsar/broker/PulsarServiceMockSupport.java    | 75 ++++++++++++++++++++++
 .../broker/service/MessageCumulativeAckTest.java   |  9 ++-
 .../PersistentDispatcherFailoverConsumerTest.java  | 36 ++++++-----
 .../pulsar/broker/service/PersistentTopicTest.java | 27 ++++----
 .../broker/service/ServerCnxAuthorizationTest.java | 16 +++--
 .../metadata/impl/AbstractMetadataStore.java       |  3 +-
 6 files changed, 132 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
new file mode 100644
index 00000000000..e871919d882
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.mockito.Mockito.mock;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarServiceMockSupport {
+
+    /**
+     * see: https://github.com/apache/pulsar/pull/16821.
+     * While executing 
"doReturn(pulsarResources).when(pulsar).getPulsarResources()", Meta Store 
Thread also accesses
+     * variable PulsarService.getPulsarResources() asynchronously in logic: 
"notification by zk-watcher".
+     * So execute mock-cmd in meta-thread (The executor of MetaStore is a 
single thread pool executor, so all things
+     * will be thread safety).
+     * Note: If the MetaStore's executor is no longer single-threaded, should 
mock as single-threaded if you need to
+     * execute this method.
+     */
+    public static void mockPulsarServiceProps(final PulsarService 
pulsarService, Runnable mockTask)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        final CompletableFuture<Void> mockGetPulsarResourceFuture = new 
CompletableFuture<>();
+        MetadataStoreExtended metadataStoreExtended = 
pulsarService.getLocalMetadataStore();
+        if (metadataStoreExtended instanceof AbstractMetadataStore){
+            AbstractMetadataStore abstractMetadataStore = 
(AbstractMetadataStore) metadataStoreExtended;
+            abstractMetadataStore.execute(() -> {
+                mockTask.run();
+                mockGetPulsarResourceFuture.complete(null);
+            }, mock(CompletableFuture.class));
+            try {
+                mockGetPulsarResourceFuture.get(1, TimeUnit.SECONDS);
+            } catch (TimeoutException timeoutException){
+                mockTask.run();
+            }
+        } else {
+            mockTask.run();
+        }
+    }
+
+    @Test
+    public void testMockMetaStore() throws Exception{
+        AtomicInteger integer = new AtomicInteger();
+        PulsarService pulsarService = Mockito.mock(PulsarService.class);
+        
Mockito.when(pulsarService.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
+        mockPulsarServiceProps(pulsarService, () -> integer.incrementAndGet());
+        Assert.assertEquals(integer.get(), 1);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index d45054fab79..846cc1df58a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -92,7 +93,9 @@ public class MessageCumulativeAckTest {
         doReturn(store).when(pulsar).getConfigurationMetadataStore();
 
         PulsarResources pulsarResources = new PulsarResources(store, store);
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
 
         serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
         doReturn(true).when(serverCnx).isActive();
@@ -105,7 +108,9 @@ public class MessageCumulativeAckTest {
 
         eventLoopGroup = new NioEventLoopGroup();
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        doReturn(brokerService).when(pulsar).getBrokerService();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+        });
 
         String topicName = 
TopicName.get("MessageCumulativeAckTest").toString();
         PersistentTopic persistentTopic = new PersistentTopic(topicName, 
mock(ManagedLedger.class), brokerService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 40f58c9a67f..32dffb06316 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.matches;
 import static org.mockito.ArgumentMatchers.same;
-import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -30,7 +29,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -47,7 +45,6 @@ import java.util.ArrayList;
 import java.util.function.Supplier;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -69,6 +66,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.PulsarResources;
@@ -85,13 +83,9 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -128,24 +122,32 @@ public class PersistentDispatcherFailoverConsumerTest {
         ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
+        store = MetadataStoreFactory.create("memory://local", 
MetadataStoreConfig.builder().build());
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(svcConfig).when(pulsar).getConfiguration();
+        });
 
         mlFactoryMock = mock(ManagedLedgerFactory.class);
-        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        });
 
         doReturn(TransactionTestBase.createMockBookKeeper(executor))
                 .when(pulsar).getBookKeeperClient();
         eventLoopGroup = new NioEventLoopGroup();
 
-        store = MetadataStoreFactory.create("memory://local", 
MetadataStoreConfig.builder().build());
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
         PulsarResources pulsarResources = new PulsarResources(store, store);
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
 
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        doReturn(brokerService).when(pulsar).getBrokerService();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+        });
 
         consumerChanges = new LinkedBlockingQueue<>();
         this.channelCtx = mock(ChannelHandlerContext.class);
@@ -191,7 +193,9 @@ public class PersistentDispatcherFailoverConsumerTest {
                 .when(serverCnxWithOldVersion).getCommandSender();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
-        doReturn(nsSvc).when(pulsar).getNamespaceService();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(nsSvc).when(pulsar).getNamespaceService();
+        });
         
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
         doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 85f6e422ad0..ce207cb6beb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -95,6 +95,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -193,27 +194,29 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             deleteLedgerCallback.deleteLedgerComplete(null);
             return null;
         }).when(mlFactoryMock).asyncDelete(any(), any(), any());
-
+        // Mock metaStore.
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(createMockBookKeeper(executor))
             .when(pulsar).getBookKeeperClient();
-
         doReturn(executor).when(pulsar).getOrderedExecutor();
-
         store = new ZKMetadataStore(mockZk);
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+        // Mock pulsarResources.
         PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
         NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
         TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, store);
         doReturn(nsr).when(pulsarResources).getNamespaceResources();
         doReturn(tsr).when(pulsarResources).getTopicResources();
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
-
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
+        // Mock brokerService.
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        doReturn(brokerService).when(pulsar).getBrokerService();
-
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+        });
+        // Mock serviceCnx.
         serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
@@ -229,7 +232,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         NamespaceService nsSvc = mock(NamespaceService.class);
         NamespaceBundle bundle = mock(NamespaceBundle.class);
         
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
-        doReturn(nsSvc).when(pulsar).getNamespaceService();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(nsSvc).when(pulsar).getNamespaceService();
+        });
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         doReturn(true).when(nsSvc).isServiceUnitActive(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
index 0d4580d044c..fd86caea707 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
@@ -116,7 +117,10 @@ public class ServerCnxAuthorizationTest {
         doReturn(new 
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
 
         doReturn(svcConfig).when(pulsar).getConfiguration();
-        
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+        });
+
 
         ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
         doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
@@ -132,7 +136,9 @@ public class ServerCnxAuthorizationTest {
         doReturn(store).when(pulsar).getConfigurationMetadataStore();
 
         pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        });
         NamespaceResources namespaceResources =
                 spyWithClassAndConstructorArgs(NamespaceResources.class, 
store, store, 30);
         
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
@@ -146,8 +152,10 @@ public class ServerCnxAuthorizationTest {
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
         BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
         doReturn(interceptor).when(brokerService).getInterceptor();
-        doReturn(brokerService).when(pulsar).getBrokerService();
-        doReturn(executor).when(pulsar).getOrderedExecutor();
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+            doReturn(executor).when(pulsar).getOrderedExecutor();
+        });
     }
 
     @Test
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index bd871605e25..d60b2a8e918 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -290,7 +290,8 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     /**
      * Run the task in the executor thread and fail the future if the executor 
is shutting down
      */
-    protected void execute(Runnable task, CompletableFuture<?> future) {
+    @VisibleForTesting
+    public void execute(Runnable task, CompletableFuture<?> future) {
         try {
             executor.execute(task);
         } catch (Throwable t) {

Reply via email to