This is an automated email from the ASF dual-hosted git repository.
mhanson pushed a commit to branch release/1.11.0
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.11.0 by this push:
new 96bd8be GEODE-7400: Prevent RejectedExecutionException in
FederatingManager (#4270)
96bd8be is described below
commit 96bd8be37a8d32c7193ae0d613a1cac80cd3b496
Author: Aaron Lindsey <[email protected]>
AuthorDate: Mon Nov 11 09:36:24 2019 -0800
GEODE-7400: Prevent RejectedExecutionException in FederatingManager (#4270)
Commit f0c96db73263bb1b3cb04558f2a720d70f43421f changed the
FederatingManager class so that it reuses the same ExecutorService
between restarts. After that change, if we start the manager after
previously starting and stopping it, we get RejectedExecutionException
because it tries to invoke a task on the same ExecutorService which has
been shut down.
This commit changes the FederatingManager so that it invokes a supplier
to get a new ExecutorService each time it is started to prevent the
RejectedExecutionException.
Co-authored-by: Aaron Lindsey <[email protected]>
Co-authored-by: Kirk Lund <[email protected]>
---
...BeanFederationErrorHandlingDistributedTest.java | 5 +-
...deratingManagerConcurrencyIntegrationTest.java} | 7 +-
.../internal/FederatingManagerIntegrationTest.java | 112 +++++++++------------
.../management/internal/FederatingManager.java | 24 ++++-
.../internal/FederatingManagerFactory.java | 3 +-
.../internal/SystemManagementService.java | 7 +-
.../management/internal/FederatingManagerTest.java | 14 +++
7 files changed, 93 insertions(+), 79 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
index 06be411..41a179a 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.management.ObjectName;
@@ -204,9 +205,9 @@ public class MBeanFederationErrorHandlingDistributedTest
implements Serializable
public FederatingManager create(ManagementResourceRepo repo,
InternalDistributedSystem system,
SystemManagementService service, InternalCache cache,
StatisticsFactory statisticsFactory,
StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory,
MemberMessenger messenger,
- ExecutorService executorService) {
+ Supplier<ExecutorService> executorServiceSupplier) {
return new FederatingManager(repo, system, service, cache,
statisticsFactory,
- statisticsClock, spy(proxyFactory), messenger, executorService);
+ statisticsClock, spy(proxyFactory), messenger,
executorServiceSupplier);
}
}
}
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
similarity index 95%
copy from
geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
copy to
geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
index 355e3ec..4be6b4a 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import org.junit.After;
import org.junit.Before;
@@ -42,7 +43,7 @@ import org.apache.geode.management.ManagementService;
import org.apache.geode.test.junit.categories.JMXTest;
@Category(JMXTest.class)
-public class FederatingManagerIntegrationTest {
+public class FederatingManagerConcurrencyIntegrationTest {
private InternalCache cache;
private FederatingManager federatingManager;
@@ -102,9 +103,9 @@ public class FederatingManagerIntegrationTest {
public FederatingManager create(ManagementResourceRepo repo,
InternalDistributedSystem system,
SystemManagementService service, InternalCache cache,
StatisticsFactory statisticsFactory,
StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory,
MemberMessenger messenger,
- ExecutorService executorService) {
+ Supplier<ExecutorService> executorServiceSupplier) {
return new FederatingManager(repo, system, service, cache,
statisticsFactory,
- statisticsClock, proxyFactory, mock(MemberMessenger.class),
executorService);
+ statisticsClock, proxyFactory, mock(MemberMessenger.class),
executorServiceSupplier);
}
}
}
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
index 355e3ec..db6b2dd 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java
@@ -14,97 +14,79 @@
*/
package org.apache.geode.management.internal;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.internal.net.SocketCreator.getLocalHost;
-import static
org.apache.geode.management.internal.SystemManagementService.FEDERATING_MANAGER_FACTORY_PROPERTY;
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.quality.Strictness.STRICT_STUBS;
-import java.net.UnknownHostException;
-import java.util.concurrent.ExecutorService;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Executors;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
import org.apache.geode.StatisticsFactory;
-import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.statistics.StatisticsClock;
-import org.apache.geode.management.ManagementService;
import org.apache.geode.test.junit.categories.JMXTest;
@Category(JMXTest.class)
public class FederatingManagerIntegrationTest {
-
- private InternalCache cache;
- private FederatingManager federatingManager;
-
@Rule
- public RestoreSystemProperties restoreSystemProperties = new
RestoreSystemProperties();
+ public MockitoRule mockitoRule =
MockitoJUnit.rule().strictness(STRICT_STUBS);
+
+ @Mock
+ public InternalCache cache;
+ @Mock
+ public InternalCacheForClientAccess cacheForClientAccess;
+ @Mock
+ public MBeanProxyFactory proxyFactory;
+ @Mock
+ public MemberMessenger messenger;
+ @Mock
+ public ManagementResourceRepo repo;
+ @Mock
+ public SystemManagementService service;
+ @Mock
+ public StatisticsFactory statisticsFactory;
+ @Mock
+ public StatisticsClock statisticsClock;
+ @Mock
+ public InternalDistributedSystem system;
+ @Mock
+ public DistributionManager distributionManager;
@Before
- public void setUp() {
- System.setProperty(FEDERATING_MANAGER_FACTORY_PROPERTY,
- FederatingManagerFactoryWithMockMessenger.class.getName());
-
- cache = (InternalCache) new CacheFactory()
- .set(LOCATORS, "")
- .create();
-
- SystemManagementService managementService =
- (SystemManagementService)
ManagementService.getExistingManagementService(cache);
- managementService.createManager();
- federatingManager = managementService.getFederatingManager();
- federatingManager.startManager();
- }
-
- @After
- public void tearDown() {
- cache.close();
+ public void setUp() throws IOException, ClassNotFoundException {
+ when(cache.getCacheForProcessingClientRequests())
+ .thenReturn(cacheForClientAccess);
+ when(system.getDistributionManager())
+ .thenReturn(distributionManager);
}
@Test
- public void testFederatingManagerConcurrency() throws UnknownHostException {
- InternalDistributedMember member = member();
+ public void restartDoesNotThrowIfOtherMembersExist() {
+ when(distributionManager.getOtherDistributionManagerIds())
+
.thenReturn(Collections.singleton(mock(InternalDistributedMember.class)));
- for (int i = 1; i <= 100; i++) {
- federatingManager.addMember(member);
- }
+ FederatingManager federatingManager =
+ new FederatingManager(repo, system, service, cache, statisticsFactory,
+ statisticsClock, proxyFactory, messenger,
Executors::newSingleThreadExecutor);
- await().until(() -> !cache.getAllRegions().isEmpty());
-
- assertThat(federatingManager.getAndResetLatestException()).isNull();
- }
-
- private InternalDistributedMember member() throws UnknownHostException {
- InternalDistributedMember member = mock(InternalDistributedMember.class);
- when(member.getInetAddress()).thenReturn(getLocalHost());
- when(member.getId()).thenReturn("member-1");
- return member;
- }
-
- private static class FederatingManagerFactoryWithMockMessenger
- implements FederatingManagerFactory {
-
- public FederatingManagerFactoryWithMockMessenger() {
- // must be public for instantiation by reflection
- }
+ federatingManager.startManager();
+ federatingManager.stopManager();
- @Override
- public FederatingManager create(ManagementResourceRepo repo,
InternalDistributedSystem system,
- SystemManagementService service, InternalCache cache,
StatisticsFactory statisticsFactory,
- StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory,
MemberMessenger messenger,
- ExecutorService executorService) {
- return new FederatingManager(repo, system, service, cache,
statisticsFactory,
- statisticsClock, proxyFactory, mock(MemberMessenger.class),
executorService);
- }
+ assertThatCode(federatingManager::startManager)
+ .doesNotThrowAnyException();
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index f7eca88..ce8963d 100755
---
a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import javax.management.Notification;
import javax.management.ObjectName;
@@ -71,25 +72,36 @@ public class FederatingManager extends Manager {
private final SystemManagementService service;
private final AtomicReference<Exception> latestException = new
AtomicReference<>();
+ private final Supplier<ExecutorService> executorServiceSupplier;
+ private final MBeanProxyFactory proxyFactory;
+ private final MemberMessenger messenger;
+
/**
* This Executor uses a pool of thread to execute the member addition
/removal tasks, This will
* utilize the processing powers available. Going with unbounded queue
because tasks wont be
* unbounded in practical situation as number of members will be a finite
set at any given point
* of time
*/
- private final ExecutorService executorService;
- private final MBeanProxyFactory proxyFactory;
- private final MemberMessenger messenger;
+ private ExecutorService executorService;
+ @VisibleForTesting
FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem
system,
SystemManagementService service, InternalCache cache, StatisticsFactory
statisticsFactory,
StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory,
MemberMessenger messenger,
ExecutorService executorService) {
+ this(repo, system, service, cache, statisticsFactory, statisticsClock,
proxyFactory, messenger,
+ () -> executorService);
+ }
+
+ FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem
system,
+ SystemManagementService service, InternalCache cache, StatisticsFactory
statisticsFactory,
+ StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory,
MemberMessenger messenger,
+ Supplier<ExecutorService> executorServiceSupplier) {
super(repo, system, cache, statisticsFactory, statisticsClock);
this.service = service;
this.proxyFactory = proxyFactory;
this.messenger = messenger;
- this.executorService = executorService;
+ this.executorServiceSupplier = executorServiceSupplier;
}
/**
@@ -103,6 +115,8 @@ public class FederatingManager extends Manager {
logger.debug("Starting the Federating Manager.... ");
}
+ executorService = executorServiceSupplier.get();
+
running = true;
startManagingActivity();
messenger.broadcastManagerInfo();
@@ -211,7 +225,7 @@ public class FederatingManager extends Manager {
}
}
- private void executeTask(Runnable task) {
+ private synchronized void executeTask(Runnable task) {
try {
executorService.execute(task);
} catch (RejectedExecutionException ignored) {
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java
b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java
index ed2e62b..0a9d052 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManagerFactory.java
@@ -17,6 +17,7 @@
package org.apache.geode.management.internal;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -29,5 +30,5 @@ interface FederatingManagerFactory {
FederatingManager create(ManagementResourceRepo repo,
InternalDistributedSystem system,
SystemManagementService service, InternalCache cache, StatisticsFactory
statisticsFactory,
StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory,
MemberMessenger messenger,
- ExecutorService executorService);
+ Supplier<ExecutorService> executorServiceSupplier);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
index 6d323e6..cb00c70 100755
---
a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.management.Notification;
import javax.management.ObjectName;
@@ -570,7 +571,7 @@ public class SystemManagementService extends
BaseManagementService {
federatingManager = federatingManagerFactory.create(repo, system, this,
cache,
statisticsFactory, statisticsClock, new
MBeanProxyFactory(jmxAdapter, this),
new MemberMessenger(jmxAdapter, system),
- LoggingExecutors.newFixedThreadPool("FederatingManager", true,
+ () -> LoggingExecutors.newFixedThreadPool("FederatingManager", true,
Runtime.getRuntime().availableProcessors()));
cache.getJmxManagerAdvisor().broadcastChange();
return true;
@@ -721,9 +722,9 @@ public class SystemManagementService extends
BaseManagementService {
public FederatingManager create(ManagementResourceRepo repo,
InternalDistributedSystem system,
SystemManagementService service, InternalCache cache,
StatisticsFactory statisticsFactory,
StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory,
MemberMessenger messenger,
- ExecutorService executorService) {
+ Supplier<ExecutorService> executorServiceSupplier) {
return new FederatingManager(repo, system, service, cache,
statisticsFactory,
- statisticsClock, proxyFactory, messenger, executorService);
+ statisticsClock, proxyFactory, messenger, executorServiceSupplier);
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
index df91dae..38913d8 100644
---
a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
+++
b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java
@@ -25,6 +25,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
import java.net.InetAddress;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test;
@@ -415,6 +416,19 @@ public class FederatingManagerTest {
.isNull();
}
+ @Test
+ public void startManagerGetsNewExecutorServiceFromSupplier() {
+ @SuppressWarnings("unchecked")
+ Supplier<ExecutorService> executorServiceSupplier = mock(Supplier.class);
+
when(executorServiceSupplier.get()).thenReturn(mock(ExecutorService.class));
+ FederatingManager federatingManager = new FederatingManager(repo, system,
service, cache,
+ statisticsFactory, statisticsClock, proxyFactory, messenger,
executorServiceSupplier);
+
+ federatingManager.startManager();
+
+ verify(executorServiceSupplier).get();
+ }
+
private InternalDistributedMember member() {
return member(1, 1);
}