This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new b4ce1b79918 [fix][flaky-test] Fix ClassCastException: BrokerService
cannot be cast to class PulsarResources (#16821)
b4ce1b79918 is described below
commit b4ce1b799180f4a2ef8cdb29608cea3fb6d59b0c
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jul 28 09:45:11 2022 +0800
[fix][flaky-test] Fix ClassCastException: BrokerService cannot be cast to
class PulsarResources (#16821)
(cherry picked from commit 4ee346693df8fc8314f94d53b00283f1c6079dc1)
---
.../pulsar/broker/PulsarServiceMockSupport.java | 75 ++++++++++++++++++++++
.../broker/service/MessageCumulativeAckTest.java | 9 ++-
.../PersistentDispatcherFailoverConsumerTest.java | 36 ++++++-----
.../pulsar/broker/service/PersistentTopicTest.java | 27 ++++----
.../broker/service/ServerCnxAuthorizationTest.java | 16 +++--
.../metadata/impl/AbstractMetadataStore.java | 3 +-
6 files changed, 132 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
new file mode 100644
index 00000000000..e871919d882
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.mockito.Mockito.mock;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarServiceMockSupport {
+
+ /**
+ * see: https://github.com/apache/pulsar/pull/16821.
+ * While executing
"doReturn(pulsarResources).when(pulsar).getPulsarResources()", Meta Store
Thread also accesses
+ * variable PulsarService.getPulsarResources() asynchronously in logic:
"notification by zk-watcher".
+ * So execute mock-cmd in meta-thread (The executor of MetaStore is a
single thread pool executor, so all things
+ * will be thread safety).
+ * Note: If the MetaStore's executor is no longer single-threaded, should
mock as single-threaded if you need to
+ * execute this method.
+ */
+ public static void mockPulsarServiceProps(final PulsarService
pulsarService, Runnable mockTask)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ final CompletableFuture<Void> mockGetPulsarResourceFuture = new
CompletableFuture<>();
+ MetadataStoreExtended metadataStoreExtended =
pulsarService.getLocalMetadataStore();
+ if (metadataStoreExtended instanceof AbstractMetadataStore){
+ AbstractMetadataStore abstractMetadataStore =
(AbstractMetadataStore) metadataStoreExtended;
+ abstractMetadataStore.execute(() -> {
+ mockTask.run();
+ mockGetPulsarResourceFuture.complete(null);
+ }, mock(CompletableFuture.class));
+ try {
+ mockGetPulsarResourceFuture.get(1, TimeUnit.SECONDS);
+ } catch (TimeoutException timeoutException){
+ mockTask.run();
+ }
+ } else {
+ mockTask.run();
+ }
+ }
+
+ @Test
+ public void testMockMetaStore() throws Exception{
+ AtomicInteger integer = new AtomicInteger();
+ PulsarService pulsarService = Mockito.mock(PulsarService.class);
+
Mockito.when(pulsarService.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
+ mockPulsarServiceProps(pulsarService, () -> integer.incrementAndGet());
+ Assert.assertEquals(integer.get(), 1);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index d45054fab79..846cc1df58a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -92,7 +93,9 @@ public class MessageCumulativeAckTest {
doReturn(store).when(pulsar).getConfigurationMetadataStore();
PulsarResources pulsarResources = new PulsarResources(store, store);
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
@@ -105,7 +108,9 @@ public class MessageCumulativeAckTest {
eventLoopGroup = new NioEventLoopGroup();
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- doReturn(brokerService).when(pulsar).getBrokerService();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ });
String topicName =
TopicName.get("MessageCumulativeAckTest").toString();
PersistentTopic persistentTopic = new PersistentTopic(topicName,
mock(ManagedLedger.class), brokerService);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 40f58c9a67f..32dffb06316 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -22,7 +22,6 @@ import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.ArgumentMatchers.same;
-import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -30,7 +29,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -47,7 +45,6 @@ import java.util.ArrayList;
import java.util.function.Supplier;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -69,6 +66,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.PulsarResources;
@@ -85,13 +83,9 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -128,24 +122,32 @@ public class PersistentDispatcherFailoverConsumerTest {
ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
svcConfig.setBrokerShutdownTimeoutMs(0L);
pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
- doReturn(svcConfig).when(pulsar).getConfiguration();
+ store = MetadataStoreFactory.create("memory://local",
MetadataStoreConfig.builder().build());
+ doReturn(store).when(pulsar).getLocalMetadataStore();
+ doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+ });
mlFactoryMock = mock(ManagedLedgerFactory.class);
- doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+ });
doReturn(TransactionTestBase.createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
eventLoopGroup = new NioEventLoopGroup();
- store = MetadataStoreFactory.create("memory://local",
MetadataStoreConfig.builder().build());
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
PulsarResources pulsarResources = new PulsarResources(store, store);
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- doReturn(brokerService).when(pulsar).getBrokerService();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ });
consumerChanges = new LinkedBlockingQueue<>();
this.channelCtx = mock(ChannelHandlerContext.class);
@@ -191,7 +193,9 @@ public class PersistentDispatcherFailoverConsumerTest {
.when(serverCnxWithOldVersion).getCommandSender();
NamespaceService nsSvc = mock(NamespaceService.class);
- doReturn(nsSvc).when(pulsar).getNamespaceService();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(nsSvc).when(pulsar).getNamespaceService();
+ });
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 85f6e422ad0..ce207cb6beb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -95,6 +95,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -193,27 +194,29 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
deleteLedgerCallback.deleteLedgerComplete(null);
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());
-
+ // Mock metaStore.
ZooKeeper mockZk = createMockZooKeeper();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
-
doReturn(executor).when(pulsar).getOrderedExecutor();
-
store = new ZKMetadataStore(mockZk);
+ doReturn(store).when(pulsar).getLocalMetadataStore();
+ doReturn(store).when(pulsar).getConfigurationMetadataStore();
+ // Mock pulsarResources.
PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, store);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
doReturn(tsr).when(pulsarResources).getTopicResources();
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
-
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
+ // Mock brokerService.
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- doReturn(brokerService).when(pulsar).getBrokerService();
-
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ });
+ // Mock serviceCnx.
serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
@@ -229,7 +232,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
- doReturn(nsSvc).when(pulsar).getNamespaceService();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(nsSvc).when(pulsar).getNamespaceService();
+ });
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(true).when(nsSvc).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
index 0d4580d044c..fd86caea707 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
@@ -116,7 +117,10 @@ public class ServerCnxAuthorizationTest {
doReturn(new
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
doReturn(svcConfig).when(pulsar).getConfiguration();
-
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+ });
+
ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
@@ -132,7 +136,9 @@ public class ServerCnxAuthorizationTest {
doReturn(store).when(pulsar).getConfigurationMetadataStore();
pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
NamespaceResources namespaceResources =
spyWithClassAndConstructorArgs(NamespaceResources.class,
store, store, 30);
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
@@ -146,8 +152,10 @@ public class ServerCnxAuthorizationTest {
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
doReturn(interceptor).when(brokerService).getInterceptor();
- doReturn(brokerService).when(pulsar).getBrokerService();
- doReturn(executor).when(pulsar).getOrderedExecutor();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ doReturn(executor).when(pulsar).getOrderedExecutor();
+ });
}
@Test
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index bd871605e25..d60b2a8e918 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -290,7 +290,8 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
/**
* Run the task in the executor thread and fail the future if the executor
is shutting down
*/
- protected void execute(Runnable task, CompletableFuture<?> future) {
+ @VisibleForTesting
+ public void execute(Runnable task, CompletableFuture<?> future) {
try {
executor.execute(task);
} catch (Throwable t) {