This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4ee346693df [fix][flaky-test] Fix ClassCastException: BrokerService
cannot be cast to class PulsarResources (#16821)
4ee346693df is described below
commit 4ee346693df8fc8314f94d53b00283f1c6079dc1
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jul 28 09:45:11 2022 +0800
[fix][flaky-test] Fix ClassCastException: BrokerService cannot be cast to
class PulsarResources (#16821)
---
.../pulsar/broker/PulsarServiceMockSupport.java | 75 ++++++++++++++++++++++
.../broker/service/MessageCumulativeAckTest.java | 9 ++-
.../PersistentDispatcherFailoverConsumerTest.java | 29 ++++++---
.../pulsar/broker/service/PersistentTopicTest.java | 31 +++++----
.../broker/service/ServerCnxAuthorizationTest.java | 16 +++--
.../metadata/impl/AbstractMetadataStore.java | 3 +-
6 files changed, 135 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
new file mode 100644
index 00000000000..e871919d882
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.mockito.Mockito.mock;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarServiceMockSupport {
+
+ /**
+ * see: https://github.com/apache/pulsar/pull/16821.
+ * While executing
"doReturn(pulsarResources).when(pulsar).getPulsarResources()", Meta Store
Thread also accesses
+ * variable PulsarService.getPulsarResources() asynchronously in logic:
"notification by zk-watcher".
+ * So execute mock-cmd in meta-thread (The executor of MetaStore is a
single thread pool executor, so all things
+ * will be thread safety).
+ * Note: If the MetaStore's executor is no longer single-threaded, should
mock as single-threaded if you need to
+ * execute this method.
+ */
+ public static void mockPulsarServiceProps(final PulsarService
pulsarService, Runnable mockTask)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ final CompletableFuture<Void> mockGetPulsarResourceFuture = new
CompletableFuture<>();
+ MetadataStoreExtended metadataStoreExtended =
pulsarService.getLocalMetadataStore();
+ if (metadataStoreExtended instanceof AbstractMetadataStore){
+ AbstractMetadataStore abstractMetadataStore =
(AbstractMetadataStore) metadataStoreExtended;
+ abstractMetadataStore.execute(() -> {
+ mockTask.run();
+ mockGetPulsarResourceFuture.complete(null);
+ }, mock(CompletableFuture.class));
+ try {
+ mockGetPulsarResourceFuture.get(1, TimeUnit.SECONDS);
+ } catch (TimeoutException timeoutException){
+ mockTask.run();
+ }
+ } else {
+ mockTask.run();
+ }
+ }
+
+ @Test
+ public void testMockMetaStore() throws Exception{
+ AtomicInteger integer = new AtomicInteger();
+ PulsarService pulsarService = Mockito.mock(PulsarService.class);
+
Mockito.when(pulsarService.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
+ mockPulsarServiceProps(pulsarService, () -> integer.incrementAndGet());
+ Assert.assertEquals(integer.get(), 1);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 86754efc0c2..05e86e73038 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -94,7 +95,9 @@ public class MessageCumulativeAckTest {
doReturn(store).when(pulsar).getConfigurationMetadataStore();
PulsarResources pulsarResources = new PulsarResources(store, store);
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
@@ -107,7 +110,9 @@ public class MessageCumulativeAckTest {
eventLoopGroup = new NioEventLoopGroup();
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- doReturn(brokerService).when(pulsar).getBrokerService();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ });
String topicName =
TopicName.get("MessageCumulativeAckTest").toString();
PersistentTopic persistentTopic = new PersistentTopic(topicName,
mock(ManagedLedger.class), brokerService);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index ec4ed02e35d..0d2e5a9ee02 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -65,6 +65,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.PulsarResources;
@@ -124,24 +125,32 @@ public class PersistentDispatcherFailoverConsumerTest {
svcConfig.setSystemTopicEnabled(false);
svcConfig.setTopicLevelPoliciesEnabled(false);
pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
- doReturn(svcConfig).when(pulsar).getConfiguration();
+ store = MetadataStoreFactory.create("memory:local",
MetadataStoreConfig.builder().build());
+ doReturn(store).when(pulsar).getLocalMetadataStore();
+ doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+ });
mlFactoryMock = mock(ManagedLedgerFactory.class);
- doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+ });
doReturn(TransactionTestBase.createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
eventLoopGroup = new NioEventLoopGroup();
- store = MetadataStoreFactory.create("memory:local",
MetadataStoreConfig.builder().build());
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
PulsarResources pulsarResources = new PulsarResources(store, store);
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- doReturn(brokerService).when(pulsar).getBrokerService();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ });
consumerChanges = new LinkedBlockingQueue<>();
this.channelCtx = mock(ChannelHandlerContext.class);
@@ -187,7 +196,9 @@ public class PersistentDispatcherFailoverConsumerTest {
.when(serverCnxWithOldVersion).getCommandSender();
NamespaceService nsSvc = mock(NamespaceService.class);
- doReturn(nsSvc).when(pulsar).getNamespaceService();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(nsSvc).when(pulsar).getNamespaceService();
+ });
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 840810ecc1b..864a4e52ee2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -99,6 +99,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -205,27 +206,29 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
deleteLedgerCallback.deleteLedgerComplete(null);
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());
-
+ // Mock metaStore.
ZooKeeper mockZk = createMockZooKeeper();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
-
doReturn(executor).when(pulsar).getOrderedExecutor();
-
store = new ZKMetadataStore(mockZk);
+ doReturn(store).when(pulsar).getLocalMetadataStore();
+ doReturn(store).when(pulsar).getConfigurationMetadataStore();
+ // Mock pulsarResources.
PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, store);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
doReturn(tsr).when(pulsarResources).getTopicResources();
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
-
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
+ // Mock brokerService.
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- doReturn(brokerService).when(pulsar).getBrokerService();
-
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ });
+ // Mock serviceCnx.
serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
@@ -241,7 +244,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
- doReturn(nsSvc).when(pulsar).getNamespaceService();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(nsSvc).when(pulsar).getNamespaceService();
+ });
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(true).when(nsSvc).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
@@ -2318,7 +2323,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
CompletableFuture<Optional<Policies>> policiesFuture = new
CompletableFuture<>();
Policies policies = new Policies();
Set<String> namespaceClusters = new HashSet<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
index af539891711..6d108ce675d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
@@ -116,7 +117,10 @@ public class ServerCnxAuthorizationTest {
doReturn(new
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
doReturn(svcConfig).when(pulsar).getConfiguration();
-
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+ });
+
ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
@@ -132,7 +136,9 @@ public class ServerCnxAuthorizationTest {
doReturn(store).when(pulsar).getConfigurationMetadataStore();
pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ });
NamespaceResources namespaceResources =
spyWithClassAndConstructorArgs(NamespaceResources.class,
store, store, 30);
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
@@ -146,8 +152,10 @@ public class ServerCnxAuthorizationTest {
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
doReturn(interceptor).when(brokerService).getInterceptor();
- doReturn(brokerService).when(pulsar).getBrokerService();
- doReturn(executor).when(pulsar).getOrderedExecutor();
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ doReturn(executor).when(pulsar).getOrderedExecutor();
+ });
}
@Test
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index cac386405a6..a116ea9c294 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -316,7 +316,8 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
/**
* Run the task in the executor thread and fail the future if the executor
is shutting down.
*/
- protected void execute(Runnable task, CompletableFuture<?> future) {
+ @VisibleForTesting
+ public void execute(Runnable task, CompletableFuture<?> future) {
try {
executor.execute(task);
} catch (Throwable t) {