This is an automated email from the ASF dual-hosted git repository.
gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 58715ac GEODE-5464: Generify DistributionManager and related classes
(#2174)
58715ac is described below
commit 58715ac2f0bb086c577d204131c129a4338b07c9
Author: Galen O'Sullivan <[email protected]>
AuthorDate: Mon Jul 30 09:38:23 2018 -0700
GEODE-5464: Generify DistributionManager and related classes (#2174)
GEODE-5464: Generify DistributionManager and related classes
* Augment various collections to use generics
* Inline some iterator loops that were trivially "foreach" loops
* remove unused code
* Inline some redundant local variables
* make some methods more private
* change some tests to use `InternalDistributedMember` instead of
`DistributedMember`.
* Make ThrottlingMemLinkedQueueWithDMStats generic and use atomic.
- The previous synchronization looked quite wrong.
---
.../distributed/DistributedMemberDUnitTest.java | 4 +-
.../geode/management/ManagementTestRule.java | 3 +-
.../internal/ClusterDistributionManager.java | 404 +++++++++------------
.../distributed/internal/DistributionManager.java | 38 +-
.../distributed/internal/DistributionMessage.java | 15 +-
.../internal/InternalDistributedSystem.java | 7 +-
.../internal/LonerDistributionManager.java | 96 +----
.../internal/OverflowQueueWithDMStats.java | 26 +-
.../geode/distributed/internal/ReplySender.java | 3 +-
.../geode/distributed/internal/StartupMessage.java | 18 +-
.../internal/StartupMessageReplyProcessor.java | 5 +-
.../distributed/internal/StartupOperation.java | 8 +-
.../ThrottlingMemLinkedQueueWithDMStats.java | 30 +-
.../distributed/internal/locks/DLockService.java | 67 ++--
.../internal/locks/GrantorRequestProcessor.java | 8 +-
.../internal/membership/MembershipManager.java | 8 +-
.../membership/gms/interfaces/Messenger.java | 6 +-
.../membership/gms/messenger/JGroupsMessenger.java | 12 +-
.../membership/gms/mgr/GMSMembershipManager.java | 35 +-
.../geode/internal/cache/GemFireCacheImpl.java | 2 +-
.../geode/internal/tcp/ConnectExceptions.java | 21 +-
.../geode/internal/tcp/DirectReplySender.java | 3 +-
.../management/internal/FederatingManager.java | 12 +-
.../geode/management/internal/MemberMessenger.java | 3 +-
.../internal/messages/CompactRequest.java | 2 +-
.../management/LuceneManagementDUnitTest.java | 8 +-
.../cache/wan/GatewayReceiverMBeanDUnitTest.java | 4 +-
27 files changed, 324 insertions(+), 524 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedMemberDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedMemberDUnitTest.java
index 94a8d63..fd4b816 100755
---
a/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedMemberDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedMemberDUnitTest.java
@@ -224,7 +224,7 @@ public class DistributedMemberDUnitTest extends
JUnit4DistributedTestCase {
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(() -> dm.getOtherNormalDistributionManagerIds().size() ==
3);
- Set<DistributedMember> members =
dm.getOtherNormalDistributionManagerIds();
+ Set<InternalDistributedMember> members =
dm.getOtherNormalDistributionManagerIds();
for (Iterator iterMembers = members.iterator();
iterMembers.hasNext();) {
InternalDistributedMember member = (InternalDistributedMember)
iterMembers.next();
@@ -349,7 +349,7 @@ public class DistributedMemberDUnitTest extends
JUnit4DistributedTestCase {
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(() -> dm.getOtherNormalDistributionManagerIds().size() ==
3);
- Set<DistributedMember> members =
dm.getOtherNormalDistributionManagerIds();
+ Set<InternalDistributedMember> members =
dm.getOtherNormalDistributionManagerIds();
// Make sure getAllOtherMembers returns a set
// containing our three peers plus an admin member.
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/management/ManagementTestRule.java
b/geode-core/src/distributedTest/java/org/apache/geode/management/ManagementTestRule.java
index caa69b6..5d4a72e 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/management/ManagementTestRule.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/management/ManagementTestRule.java
@@ -41,6 +41,7 @@ import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.test.dunit.Invoke;
@@ -238,7 +239,7 @@ public class ManagementTestRule implements MethodRule,
Serializable {
return allMembers;
}
- public Set<DistributedMember> getAllNormalMembers() {
+ private Set<InternalDistributedMember> getAllNormalMembers() {
return getDistributionManager().getNormalDistributionManagerIds(); //
excludes LOCATOR_DM_TYPE
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index b615b58..9972e13 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -26,7 +26,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
@@ -117,7 +116,7 @@ public class ClusterDistributionManager implements
DistributionManager {
private static final int STARTUP_TIMEOUT =
Integer.getInteger("DistributionManager.STARTUP_TIMEOUT",
15000).intValue();
- public static final boolean DEBUG_NO_ACKNOWLEDGEMENTS =
+ private static final boolean DEBUG_NO_ACKNOWLEDGEMENTS =
Boolean.getBoolean("DistributionManager.DEBUG_NO_ACKNOWLEDGEMENTS");
/**
@@ -256,7 +255,7 @@ public class ClusterDistributionManager implements
DistributionManager {
/** Is this node running an AdminDistributedSystem? */
private static volatile boolean isDedicatedAdminVM = false;
- private static ThreadLocal<Boolean> isStartupThread = new ThreadLocal();
+ private static ThreadLocal<Boolean> isStartupThread = new ThreadLocal<>();
/**
* Identifier for function execution threads and any of their children
@@ -301,7 +300,7 @@ public class ClusterDistributionManager implements
DistributionManager {
protected volatile InternalDistributedMember elder = null;
/** The id of this distribution manager */
- protected final InternalDistributedMember localAddress;
+ private final InternalDistributedMember localAddress;
/**
* The distribution manager type of this dm; set in its constructor.
@@ -318,17 +317,17 @@ public class ClusterDistributionManager implements
DistributionManager {
*
* @since GemFire 5.7
*/
- protected volatile Set<MembershipListener> allMembershipListeners =
Collections.emptySet();
+ private volatile Set<MembershipListener> allMembershipListeners =
Collections.emptySet();
/**
* A lock to hold while adding and removing all membership listeners.
*
* @since GemFire 5.7
*/
- private final Object allMembershipListenersLock = new
MembershipListenersLock();
+ private final Object allMembershipListenersLock = new Object();
/** A queue of MemberEvent instances */
- private final BlockingQueue<MemberEvent> membershipEventQueue = new
LinkedBlockingQueue();
+ private final BlockingQueue<MemberEvent> membershipEventQueue = new
LinkedBlockingQueue<>();
/** Used to invoke registered membership listeners in the background. */
private Thread memberEventThread;
@@ -338,7 +337,7 @@ public class ClusterDistributionManager implements
DistributionManager {
protected final String description;
/** Statistics about distribution */
- protected DistributionStats stats;
+ protected final DistributionStats stats;
/** Did an exception occur in one of the DM threads? */
private boolean exceptionInThreads;
@@ -389,7 +388,7 @@ public class ClusterDistributionManager implements
DistributionManager {
* 3) {@link #hostedLocatorsAll}<br>
* 4) {@link #hostedLocatorsWithSharedConfiguration}<br>
*/
- private final Object membersLock = new MembersLock();
+ private final Object membersLock = new Object();
/**
* The lock held while writing {@link #adminConsoles}.
@@ -450,7 +449,7 @@ public class ClusterDistributionManager implements
DistributionManager {
* If using a throttling queue for the serialThread, we cache the queue here
so we can see if
* delivery would block
*/
- private ThrottlingMemLinkedQueueWithDMStats serialQueue;
+ private ThrottlingMemLinkedQueueWithDMStats<Runnable> serialQueue;
/**
* Thread Monitor mechanism to monitor system threads
@@ -505,22 +504,20 @@ public class ClusterDistributionManager implements
DistributionManager {
private final Map<InternalDistributedMember, String> redundancyZones =
- Collections.synchronizedMap(new HashMap<InternalDistributedMember,
String>());
+ Collections.synchronizedMap(new HashMap<>());
private boolean enforceUniqueZone = false;
/**
* root cause of forcibly shutting down the distribution manager
*/
- volatile Throwable rootCause = null;
+ private volatile Throwable rootCause = null;
/**
* @see #closeInProgress
*/
private final Object shutdownMutex = new Object();
-
-
////////////////////// Static Methods //////////////////////
/**
@@ -570,7 +567,7 @@ public class ClusterDistributionManager implements
DistributionManager {
{
InternalDistributedMember id =
distributionManager.getDistributionManagerId();
if (!"".equals(id.getName())) {
- for (InternalDistributedMember m : (List<InternalDistributedMember>)
distributionManager
+ for (InternalDistributedMember m : distributionManager
.getViewMembers()) {
if (m.equals(id)) {
// I'm counting on the members returned by getViewMembers being
ordered such that
@@ -669,17 +666,12 @@ public class ClusterDistributionManager implements
DistributionManager {
this.elderLock = new StoppableReentrantLock(stopper);
this.transport = transport;
- this.membershipListeners = new ConcurrentHashMap();
+ this.membershipListeners = new ConcurrentHashMap<>();
this.distributedSystemId = system.getConfig().getDistributedSystemId();
- {
- long statId = OSProcess.getId();
- /*
- * deadcoded since we don't know the channel id yet. if (statId == 0 ||
statId == -1) { statId
- * = getMembershipPort(); }
- */
- this.stats = new DistributionStats(system, statId);
- DistributionStats.enableClockStats =
system.getConfig().getEnableTimeStatistics();
- }
+
+ long statId = OSProcess.getId();
+ this.stats = new DistributionStats(system, statId);
+ DistributionStats.enableClockStats =
system.getConfig().getEnableTimeStatistics();
this.exceptionInThreads = false;
@@ -728,9 +720,10 @@ public class ClusterDistributionManager implements
DistributionManager {
if (SERIAL_QUEUE_BYTE_LIMIT == 0) {
poolQueue = new
OverflowQueueWithDMStats(this.stats.getSerialQueueHelper());
} else {
- this.serialQueue = new
ThrottlingMemLinkedQueueWithDMStats(TOTAL_SERIAL_QUEUE_BYTE_LIMIT,
- TOTAL_SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT,
SERIAL_QUEUE_SIZE_THROTTLE,
- this.stats.getSerialQueueHelper());
+ this.serialQueue =
+ new
ThrottlingMemLinkedQueueWithDMStats<>(TOTAL_SERIAL_QUEUE_BYTE_LIMIT,
+ TOTAL_SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT,
SERIAL_QUEUE_SIZE_THROTTLE,
+ this.stats.getSerialQueueHelper());
poolQueue = this.serialQueue;
}
ThreadFactory tf = new ThreadFactory() {
@@ -758,9 +751,9 @@ public class ClusterDistributionManager implements
DistributionManager {
return thread;
}
};
- SerialQueuedExecutorWithDMStats executor = new
SerialQueuedExecutorWithDMStats(poolQueue,
+
+ this.serialThread = new SerialQueuedExecutorWithDMStats(poolQueue,
this.stats.getSerialProcessorHelper(), tf, threadMonitor);
- this.serialThread = executor;
}
{
BlockingQueue q = new LinkedBlockingQueue();
@@ -793,11 +786,11 @@ public class ClusterDistributionManager implements
DistributionManager {
}
{
- BlockingQueue poolQueue;
+ BlockingQueue<Runnable> poolQueue;
if (INCOMING_QUEUE_LIMIT == 0) {
- poolQueue = new
OverflowQueueWithDMStats(this.stats.getOverflowQueueHelper());
+ poolQueue = new
OverflowQueueWithDMStats<>(this.stats.getOverflowQueueHelper());
} else {
- poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT,
+ poolQueue = new OverflowQueueWithDMStats<>(INCOMING_QUEUE_LIMIT,
this.stats.getOverflowQueueHelper());
}
ThreadFactory tf = new ThreadFactory() {
@@ -827,18 +820,17 @@ public class ClusterDistributionManager implements
DistributionManager {
return thread;
}
};
- ThreadPoolExecutor pool = new PooledExecutorWithDMStats(poolQueue,
MAX_THREADS,
+ this.threadPool = new PooledExecutorWithDMStats(poolQueue, MAX_THREADS,
this.stats.getNormalPoolHelper(), tf, threadMonitor);
- this.threadPool = pool;
}
{
- BlockingQueue poolQueue;
+ BlockingQueue<Runnable> poolQueue;
if (INCOMING_QUEUE_LIMIT == 0) {
- poolQueue = new
OverflowQueueWithDMStats(this.stats.getHighPriorityQueueHelper());
+ poolQueue = new
OverflowQueueWithDMStats<>(this.stats.getHighPriorityQueueHelper());
} else {
- poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT,
+ poolQueue = new OverflowQueueWithDMStats<>(INCOMING_QUEUE_LIMIT,
this.stats.getHighPriorityQueueHelper());
}
ThreadFactory tf = new ThreadFactory() {
@@ -901,12 +893,12 @@ public class ClusterDistributionManager implements
DistributionManager {
return thread;
}
};
- BlockingQueue poolQueue;
+ BlockingQueue<Runnable> poolQueue;
if (MAX_WAITING_THREADS == Integer.MAX_VALUE) {
// no need for a queue since we have infinite threads
- poolQueue = new SynchronousQueue();
+ poolQueue = new SynchronousQueue<>();
} else {
- poolQueue = new
OverflowQueueWithDMStats(this.stats.getWaitingQueueHelper());
+ poolQueue = new
OverflowQueueWithDMStats<>(this.stats.getWaitingQueueHelper());
}
this.waitingPool = new PooledExecutorWithDMStats(poolQueue,
MAX_WAITING_THREADS,
this.stats.getWaitingPoolHelper(), tf, threadMonitor);
@@ -940,18 +932,18 @@ public class ClusterDistributionManager implements
DistributionManager {
return thread;
}
};
- BlockingQueue poolQueue;
- poolQueue = new
OverflowQueueWithDMStats(this.stats.getWaitingQueueHelper());
+ BlockingQueue<Runnable> poolQueue;
+ poolQueue = new
OverflowQueueWithDMStats<>(this.stats.getWaitingQueueHelper());
this.prMetaDataCleanupThreadPool = new
PooledExecutorWithDMStats(poolQueue,
MAX_PR_META_DATA_CLEANUP_THREADS,
this.stats.getWaitingPoolHelper(), tf, threadMonitor);
}
{
- BlockingQueue poolQueue;
+ BlockingQueue<Runnable> poolQueue;
if (INCOMING_QUEUE_LIMIT == 0) {
- poolQueue = new
OverflowQueueWithDMStats(this.stats.getPartitionedRegionQueueHelper());
+ poolQueue = new
OverflowQueueWithDMStats<>(this.stats.getPartitionedRegionQueueHelper());
} else {
- poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT,
+ poolQueue = new OverflowQueueWithDMStats<>(INCOMING_QUEUE_LIMIT,
this.stats.getPartitionedRegionQueueHelper());
}
ThreadFactory tf = new ThreadFactory() {
@@ -983,19 +975,18 @@ public class ClusterDistributionManager implements
DistributionManager {
this.partitionedRegionPool = new
PooledExecutorWithDMStats(poolQueue, MAX_PR_THREADS,
this.stats.getPartitionedRegionPoolHelper(), tf, threadMonitor);
} else {
- SerialQueuedExecutorWithDMStats executor = new
SerialQueuedExecutorWithDMStats(poolQueue,
+ this.partitionedRegionThread = new
SerialQueuedExecutorWithDMStats(poolQueue,
this.stats.getPartitionedRegionPoolHelper(), tf, threadMonitor);
- this.partitionedRegionThread = executor;
}
}
{
- BlockingQueue poolQueue;
+ BlockingQueue<Runnable> poolQueue;
if (INCOMING_QUEUE_LIMIT == 0) {
- poolQueue = new
OverflowQueueWithDMStats(this.stats.getFunctionExecutionQueueHelper());
+ poolQueue = new
OverflowQueueWithDMStats<>(this.stats.getFunctionExecutionQueueHelper());
} else {
- poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT,
+ poolQueue = new OverflowQueueWithDMStats<>(INCOMING_QUEUE_LIMIT,
this.stats.getFunctionExecutionQueueHelper());
}
ThreadFactory tf = new ThreadFactory() {
@@ -1030,9 +1021,8 @@ public class ClusterDistributionManager implements
DistributionManager {
MAX_FE_THREADS, this.stats.getFunctionExecutionPoolHelper(), tf,
true /* for fn exec */, this.threadMonitor);
} else {
- SerialQueuedExecutorWithDMStats executor = new
SerialQueuedExecutorWithDMStats(poolQueue,
+ this.functionExecutionThread = new
SerialQueuedExecutorWithDMStats(poolQueue,
this.stats.getFunctionExecutionPoolHelper(), tf, threadMonitor);
- this.functionExecutionThread = executor;
}
}
@@ -1131,11 +1121,11 @@ public class ClusterDistributionManager implements
DistributionManager {
ClusterDistributionManager.isDedicatedAdminVM = isDedicatedAdminVM;
}
- public static Boolean getIsStartupThread() {
+ private static Boolean getIsStartupThread() {
return isStartupThread.get();
}
- public static void setIsStartupThread(Boolean isStartup) {
+ private static void setIsStartupThread(Boolean isStartup) {
ClusterDistributionManager.isStartupThread.set(isStartup);
}
@@ -1280,25 +1270,6 @@ public class ClusterDistributionManager implements
DistributionManager {
return membershipManager.getView().getMembers();
}
- /* implementation of DM.getOldestMember */
- @Override
- public DistributedMember getOldestMember(Collection c) throws
NoSuchElementException {
- List<InternalDistributedMember> view = getViewMembers();
- for (int i = 0; i < view.size(); i++) {
- Object viewMbr = view.get(i);
- Iterator it = c.iterator();
- while (it.hasNext()) {
- Object nextMbr = it.next();
- if (viewMbr.equals(nextMbr)) {
- return (DistributedMember) nextMbr;
- }
- }
- }
- throw new NoSuchElementException(
-
LocalizedStrings.DistributionManager_NONE_OF_THE_GIVEN_MANAGERS_IS_IN_THE_CURRENT_MEMBERSHIP_VIEW
- .toLocalizedString());
- }
-
private boolean testMulticast() {
return this.membershipManager.testMulticast();
}
@@ -1369,9 +1340,7 @@ public class ClusterDistributionManager implements
DistributionManager {
if (readyForMessages)
return;
synchronized (this) {
- for (;;) {
- if (readyForMessages)
- break;
+ while (!readyForMessages) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
@@ -1384,7 +1353,7 @@ public class ClusterDistributionManager implements
DistributionManager {
Thread.currentThread().interrupt();
}
}
- } // for
+ }
} // synchronized
}
@@ -1421,9 +1390,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
synchronized (this.readyToSendMsgsLock) {
- for (;;) {
- if (this.readyToSendMsgs)
- break;
+ while (!this.readyToSendMsgs) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
@@ -1506,7 +1473,7 @@ public class ClusterDistributionManager implements
DistributionManager {
* distribution managers.
*/
@Override
- public Set getDistributionManagerIds() {
+ public Set<InternalDistributedMember> getDistributionManagerIds() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (this.membersLock) {
@@ -1529,10 +1496,10 @@ public class ClusterDistributionManager implements
DistributionManager {
throw new IllegalArgumentException("Cannot use empty collection of
locators");
}
if (this.hostedLocatorsAll.isEmpty()) {
- this.hostedLocatorsAll = new HashMap<InternalDistributedMember,
Collection<String>>();
+ this.hostedLocatorsAll = new HashMap<>();
}
Map<InternalDistributedMember, Collection<String>> tmp =
- new HashMap<InternalDistributedMember,
Collection<String>>(this.hostedLocatorsAll);
+ new HashMap<>(this.hostedLocatorsAll);
tmp.remove(member);
tmp.put(member, locators);
tmp = Collections.unmodifiableMap(tmp);
@@ -1571,7 +1538,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
if (this.hostedLocatorsWithSharedConfiguration.containsKey(member)) {
Map<InternalDistributedMember, Collection<String>> tmp =
- new HashMap<InternalDistributedMember, Collection<String>>(
+ new HashMap<>(
this.hostedLocatorsWithSharedConfiguration);
tmp.remove(member);
if (tmp.isEmpty()) {
@@ -1641,7 +1608,7 @@ public class ClusterDistributionManager implements
DistributionManager {
* distribution managers.
*/
@Override
- public Set getDistributionManagerIdsIncludingAdmin() {
+ public Set<InternalDistributedMember>
getDistributionManagerIdsIncludingAdmin() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (this.membersLock) {
@@ -1655,10 +1622,10 @@ public class ClusterDistributionManager implements
DistributionManager {
* managers not including me.
*/
@Override
- public Set getOtherDistributionManagerIds() {
+ public Set<InternalDistributedMember> getOtherDistributionManagerIds() {
// We return a modified copy of the list, so
// collect the old list and copy under the lock.
- Set result = new HashSet(getDistributionManagerIds());
+ Set<InternalDistributedMember> result = new
HashSet<>(getDistributionManagerIds());
InternalDistributedMember me = getDistributionManagerId();
result.remove(me);
@@ -1668,10 +1635,10 @@ public class ClusterDistributionManager implements
DistributionManager {
}
@Override
- public Set getOtherNormalDistributionManagerIds() {
+ public Set<InternalDistributedMember> getOtherNormalDistributionManagerIds()
{
// We return a modified copy of the list, so
// collect the old list and copy under the lock.
- Set result = new HashSet(getNormalDistributionManagerIds());
+ Set<InternalDistributedMember> result = new
HashSet<>(getNormalDistributionManagerIds());
InternalDistributedMember me = getDistributionManagerId();
result.remove(me);
@@ -1694,7 +1661,8 @@ public class ClusterDistributionManager implements
DistributionManager {
* Add a membership listener and return other DistributionManagerIds as an
atomic operation
*/
@Override
- public Set
addMembershipListenerAndGetDistributionManagerIds(MembershipListener l) {
+ public Set<InternalDistributedMember>
addMembershipListenerAndGetDistributionManagerIds(
+ MembershipListener l) {
// switched sync order to fix bug 30360
synchronized (this.membersLock) {
// Don't let the members come and go while we are adding this
@@ -1738,7 +1706,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
@Override
- public Set putOutgoing(final DistributionMessage msg) {
+ public Set<InternalDistributedMember> putOutgoing(final DistributionMessage
msg) {
try {
DistributionMessageObserver observer =
DistributionMessageObserver.getInstance();
if (observer != null) {
@@ -2176,7 +2144,8 @@ public class ClusterDistributionManager implements
DistributionManager {
*/
private void addAllMembershipListener(MembershipListener l) {
synchronized (this.allMembershipListenersLock) {
- Set newAllMembershipListeners = new HashSet(this.allMembershipListeners);
+ Set<MembershipListener> newAllMembershipListeners =
+ new HashSet<>(this.allMembershipListeners);
newAllMembershipListeners.add(l);
this.allMembershipListeners = newAllMembershipListeners;
}
@@ -2185,7 +2154,8 @@ public class ClusterDistributionManager implements
DistributionManager {
@Override
public void removeAllMembershipListener(MembershipListener l) {
synchronized (this.allMembershipListenersLock) {
- Set newAllMembershipListeners = new HashSet(this.allMembershipListeners);
+ Set<MembershipListener> newAllMembershipListeners =
+ new HashSet<>(this.allMembershipListeners);
if (!newAllMembershipListeners.remove(l)) {
// There seems to be a race condition in which
// multiple departure events can be registered
@@ -2206,10 +2176,7 @@ public class ClusterDistributionManager implements
DistributionManager {
return true;
}
InternalDistributedSystem ds = getSystem();
- if (ds != null && ds.isDisconnecting()) {
- return true;
- }
- return false;
+ return ds != null && ds.isDisconnecting();
}
public boolean isShutdownStarted() {
@@ -2274,7 +2241,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
try {
MemberEvent ev =
- (MemberEvent)
ClusterDistributionManager.this.membershipEventQueue.take();
+ ClusterDistributionManager.this.membershipEventQueue.take();
handleMemberEvent(ev);
} catch (InterruptedException e) {
if (isCloseInProgress()) {
@@ -2367,7 +2334,7 @@ public class ClusterDistributionManager implements
DistributionManager {
logger.info(LocalizedMessage.create(
LocalizedStrings.DistributionManager_NEW_ADMINISTRATION_MEMBER_DETECTED_AT_0,
theId));
synchronized (this.adminConsolesLock) {
- HashSet tmp = new HashSet(this.adminConsoles);
+ HashSet<InternalDistributedMember> tmp = new
HashSet<>(this.adminConsoles);
tmp.add(theId);
this.adminConsoles = Collections.unmodifiableSet(tmp);
}
@@ -2389,8 +2356,9 @@ public class ClusterDistributionManager implements
DistributionManager {
}
@Override
- public Set getAllOtherMembers() {
- Set result = new HashSet(getDistributionManagerIdsIncludingAdmin());
+ public Set<InternalDistributedMember> getAllOtherMembers() {
+ Set<InternalDistributedMember> result =
+ new HashSet<>(getDistributionManagerIdsIncludingAdmin());
result.remove(getDistributionManagerId());
return result;
}
@@ -2398,27 +2366,17 @@ public class ClusterDistributionManager implements
DistributionManager {
@Override
public void
retainMembersWithSameOrNewerVersion(Collection<InternalDistributedMember>
members,
Version version) {
- for (Iterator<InternalDistributedMember> it = members.iterator();
it.hasNext();) {
- InternalDistributedMember id = it.next();
- if (id.getVersionObject().compareTo(version) < 0) {
- it.remove();
- }
- }
+ members.removeIf(id -> id.getVersionObject().compareTo(version) < 0);
}
@Override
public void
removeMembersWithSameOrNewerVersion(Collection<InternalDistributedMember>
members,
Version version) {
- for (Iterator<InternalDistributedMember> it = members.iterator();
it.hasNext();) {
- InternalDistributedMember id = it.next();
- if (id.getVersionObject().compareTo(version) >= 0) {
- it.remove();
- }
- }
+ members.removeIf(id -> id.getVersionObject().compareTo(version) >= 0);
}
@Override
- public Set addAllMembershipListenerAndGetAllIds(MembershipListener l) {
+ public Set<InternalDistributedMember>
addAllMembershipListenerAndGetAllIds(MembershipListener l) {
MembershipManager mgr = membershipManager;
mgr.getViewLock().writeLock().lock();
try {
@@ -2446,10 +2404,10 @@ public class ClusterDistributionManager implements
DistributionManager {
boolean ok = false;
// Be sure to add ourself to the equivalencies list!
- Set equivs = StartupMessage.getMyAddresses(this);
+ Set<InetAddress> equivs = StartupMessage.getMyAddresses(this);
if (equivs == null || equivs.size() == 0) {
// no network interface
- equivs = new HashSet();
+ equivs = new HashSet<>();
try {
equivs.add(SocketCreator.getLocalHost());
} catch (UnknownHostException e) {
@@ -2482,7 +2440,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
// we need to send this to everyone else; even admin vm
- Set allOthers = new HashSet(getViewMembers());
+ Set<InternalDistributedMember> allOthers = new HashSet<>(getViewMembers());
allOthers.remove(getDistributionManagerId());
if (allOthers.isEmpty()) {
@@ -2502,11 +2460,6 @@ public class ClusterDistributionManager implements
DistributionManager {
throw new IncompatibleSystemException(rejectionMessage);
}
- boolean isAdminDM = getId().getVmKind() ==
ClusterDistributionManager.ADMIN_ONLY_DM_TYPE
- || getId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE
- || ClusterDistributionManager.isDedicatedAdminVM()
- || Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE);
-
boolean receivedAny = this.receivedStartupResponse;
if (!ok) { // someone didn't reply
@@ -2576,7 +2529,7 @@ public class ClusterDistributionManager implements
DistributionManager {
* <p>
* Must be synchronized using {@link #unfinishedStartupsLock}
*/
- private Set unfinishedStartups = null;
+ private Set<InternalDistributedMember> unfinishedStartups = null;
/**
* Synchronization for {@link #unfinishedStartups}
@@ -2584,10 +2537,10 @@ public class ClusterDistributionManager implements
DistributionManager {
private final Object unfinishedStartupsLock = new Object();
@Override
- public void setUnfinishedStartups(Collection s) {
+ public void setUnfinishedStartups(Collection<InternalDistributedMember> s) {
synchronized (unfinishedStartupsLock) {
Assert.assertTrue(unfinishedStartups == null, "Set unfinished startups
twice");
- unfinishedStartups = new HashSet(s);
+ unfinishedStartups = new HashSet<>(s);
// OK, I don't _quite_ trust the list to be current, so let's
// prune it here.
@@ -2801,12 +2754,12 @@ public class ClusterDistributionManager implements
DistributionManager {
logger.debug("DistributionManager: removing member <{}>; crashed {};
reason = {}", theId,
crashed, reason);
}
- Map<InternalDistributedMember, InternalDistributedMember> tmp = new
HashMap(this.members);
+ Map<InternalDistributedMember, InternalDistributedMember> tmp = new
HashMap<>(this.members);
if (tmp.remove(theId) != null) {
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
if (tmp.isEmpty()) {
- tmp = Collections.EMPTY_MAP;
+ tmp = Collections.emptyMap();
} else {
tmp = Collections.unmodifiableMap(tmp);
}
@@ -2819,10 +2772,10 @@ public class ClusterDistributionManager implements
DistributionManager {
// an explicit remove followed by an implicit one caused
// by a JavaGroup view change
}
- Set tmp2 = new HashSet(this.membersAndAdmin);
+ Set<InternalDistributedMember> tmp2 = new
HashSet<>(this.membersAndAdmin);
if (tmp2.remove(theId)) {
if (tmp2.isEmpty()) {
- tmp2 = Collections.EMPTY_SET;
+ tmp2 = Collections.emptySet();
} else {
tmp2 = Collections.unmodifiableSet(tmp2);
}
@@ -2863,11 +2816,11 @@ public class ClusterDistributionManager implements
DistributionManager {
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
- tmp = new HashMap(this.members);
+ tmp = new HashMap<>(this.members);
tmp.put(theId, theId);
this.members = Collections.unmodifiableMap(tmp);
- Set stmp = new HashSet(this.membersAndAdmin);
+ Set<InternalDistributedMember> stmp = new
HashSet<>(this.membersAndAdmin);
stmp.add(theId);
this.membersAndAdmin = Collections.unmodifiableSet(stmp);
} // synchronized
@@ -2898,7 +2851,7 @@ public class ClusterDistributionManager implements
DistributionManager {
*/
private void handleConsoleStartup(InternalDistributedMember theId) {
// if we have an all listener then notify it NOW.
- HashSet tmp = null;
+ HashSet<InternalDistributedMember> tmp = null;
synchronized (this.membersLock) {
// Note test is under membersLock
if (membersAndAdmin.contains(theId))
@@ -2906,13 +2859,12 @@ public class ClusterDistributionManager implements
DistributionManager {
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
- tmp = new HashSet(this.membersAndAdmin);
+ tmp = new HashSet<>(this.membersAndAdmin);
tmp.add(theId);
this.membersAndAdmin = Collections.unmodifiableSet(tmp);
} // synchronized
- for (Iterator iter = allMembershipListeners.iterator(); iter.hasNext();) {
- MembershipListener listener = (MembershipListener) iter.next();
+ for (MembershipListener listener : allMembershipListeners) {
listener.memberJoined(this, theId);
}
logger.info(LocalizedMessage.create(
@@ -2958,12 +2910,12 @@ public class ClusterDistributionManager implements
DistributionManager {
if (logger.isDebugEnabled())
logger.debug("DistributionManager: removing admin member <{}>;
crashed = {}; reason = {}",
theId, crashed, reason);
- Set tmp = new HashSet(this.membersAndAdmin);
+ Set<InternalDistributedMember> tmp = new
HashSet<>(this.membersAndAdmin);
if (tmp.remove(theId)) {
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
if (tmp.isEmpty()) {
- tmp = Collections.EMPTY_SET;
+ tmp = Collections.emptySet();
} else {
tmp = Collections.unmodifiableSet(tmp);
}
@@ -2980,10 +2932,10 @@ public class ClusterDistributionManager implements
DistributionManager {
synchronized (this.adminConsolesLock) {
if (this.adminConsoles.contains(theId)) {
removedConsole = true;
- Set tmp = new HashSet(this.adminConsoles);
+ Set<InternalDistributedMember> tmp = new HashSet<>(this.adminConsoles);
tmp.remove(theId);
if (tmp.isEmpty()) {
- tmp = Collections.EMPTY_SET;
+ tmp = Collections.emptySet();
} else {
tmp = Collections.unmodifiableSet(tmp);
}
@@ -2991,8 +2943,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
}
if (removedMember) {
- for (Iterator iter = allMembershipListeners.iterator(); iter.hasNext();)
{
- MembershipListener listener = (MembershipListener) iter.next();
+ for (MembershipListener listener : allMembershipListeners) {
listener.memberDeparted(this, theId, crashed);
}
}
@@ -3009,7 +2960,7 @@ public class ClusterDistributionManager implements
DistributionManager {
redundancyZones.remove(theId);
}
- public void shutdownMessageReceived(InternalDistributedMember theId, String
reason) {
+ void shutdownMessageReceived(InternalDistributedMember theId, String reason)
{
this.membershipManager.shutdownMessageReceived(theId, reason);
handleManagerDeparture(theId, false,
LocalizedStrings.ShutdownMessage_SHUTDOWN_MESSAGE_RECEIVED.toLocalizedString());
@@ -3018,8 +2969,6 @@ public class ClusterDistributionManager implements
DistributionManager {
@Override
public void handleManagerDeparture(InternalDistributedMember theId, boolean
p_crashed,
String p_reason) {
- boolean crashed = p_crashed;
- String reason = p_reason;
AlertAppender.getInstance().removeAlertListener(theId);
@@ -3037,7 +2986,7 @@ public class ClusterDistributionManager implements
DistributionManager {
int vmType = theId.getVmKind();
if (vmType == ADMIN_ONLY_DM_TYPE) {
removeUnfinishedStartup(theId, true);
- handleConsoleShutdown(theId, crashed, reason);
+ handleConsoleShutdown(theId, p_crashed, p_reason);
return;
}
@@ -3047,21 +2996,21 @@ public class ClusterDistributionManager implements
DistributionManager {
}
removeUnfinishedStartup(theId, true);
- if (removeManager(theId, crashed, reason)) {
+ if (removeManager(theId, p_crashed, p_reason)) {
if (theId.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) {
this.stats.incNodes(-1);
}
StringId msg;
- if (crashed && !isCloseInProgress()) {
+ if (p_crashed && !isCloseInProgress()) {
msg =
LocalizedStrings.DistributionManager_MEMBER_AT_0_UNEXPECTEDLY_LEFT_THE_DISTRIBUTED_CACHE_1;
- addMemberEvent(new MemberCrashedEvent(theId, reason));
+ addMemberEvent(new MemberCrashedEvent(theId, p_reason));
} else {
msg =
LocalizedStrings.DistributionManager_MEMBER_AT_0_GRACEFULLY_LEFT_THE_DISTRIBUTED_CACHE_1;
- addMemberEvent(new MemberDepartedEvent(theId, reason));
+ addMemberEvent(new MemberDepartedEvent(theId, p_reason));
}
- logger.info(LocalizedMessage.create(msg, new Object[] {theId,
prettifyReason(reason)}));
+ logger.info(LocalizedMessage.create(msg, new Object[] {theId,
prettifyReason(p_reason)}));
// Remove this manager from the serialQueueExecutor.
if (this.serialQueuedExecutorPool != null) {
@@ -3108,7 +3057,7 @@ public class ClusterDistributionManager implements
DistributionManager {
ShutdownMessage m = new ShutdownMessage();
InternalDistributedMember theId = this.getDistributionManagerId();
m.setDistributionManagerId(theId);
- Set allOthers = new HashSet(getViewMembers());
+ Set<InternalDistributedMember> allOthers = new HashSet<>(getViewMembers());
allOthers.remove(getDistributionManagerId());
m.setRecipients(allOthers);
@@ -3171,11 +3120,13 @@ public class ClusterDistributionManager implements
DistributionManager {
* all received it or it was sent to {@link
DistributionMessage#ALL_RECIPIENTS}.
* @throws NotSerializableException If <code>message</code> cannot be
serialized
*/
- Set sendOutgoing(DistributionMessage message) throws
NotSerializableException {
+ Set<InternalDistributedMember> sendOutgoing(DistributionMessage message)
+ throws NotSerializableException {
long startTime = DistributionStats.getStatTime();
- Set result = sendViaMembershipManager(message.getRecipients(), message,
- ClusterDistributionManager.this, this.stats);
+ Set<InternalDistributedMember> result =
+ sendViaMembershipManager(message.getRecipients(), message,
+ ClusterDistributionManager.this, this.stats);
long endTime = 0L;
if (DistributionStats.enableClockStats) {
endTime = NanoTimer.getTime();
@@ -3203,8 +3154,9 @@ public class ClusterDistributionManager implements
DistributionManager {
* @return recipients who did not receive the message
* @throws NotSerializableException If <codE>message</code> cannot be
serialized
*/
- Set sendMessage(DistributionMessage message) throws NotSerializableException
{
- Set result = null;
+ private Set<InternalDistributedMember> sendMessage(DistributionMessage
message)
+ throws NotSerializableException {
+ Set<InternalDistributedMember> result = null;
try {
// Verify we're not too far into the shutdown
stopper.checkCancelInProgress(null);
@@ -3229,13 +3181,10 @@ public class ClusterDistributionManager implements
DistributionManager {
ex);
if (message == null || message.forAll())
return null;
- result = new HashSet();
+ result = new HashSet<>();
for (int i = 0; i < message.getRecipients().length; i++)
result.add(message.getRecipients()[i]);
return result;
- /*
- * if (ex instanceof org.apache.geode.GemFireIpcResourceException) {
return; }
- */
}
return result;
}
@@ -3245,7 +3194,8 @@ public class ClusterDistributionManager implements
DistributionManager {
* all received it or it was sent to {@link
DistributionMessage#ALL_RECIPIENTS}).
* @throws NotSerializableException If content cannot be serialized
*/
- private Set sendViaMembershipManager(InternalDistributedMember[]
destinations,
+ private Set<InternalDistributedMember> sendViaMembershipManager(
+ InternalDistributedMember[] destinations,
DistributionMessage content, ClusterDistributionManager dm,
DistributionStats stats)
throws NotSerializableException {
if (membershipManager == null) {
@@ -3253,10 +3203,8 @@ public class ClusterDistributionManager implements
DistributionManager {
LocalizedStrings.DistributionChannel_ATTEMPTING_A_SEND_TO_A_DISCONNECTED_DISTRIBUTIONMANAGER));
if (destinations.length == 1 && destinations[0] ==
DistributionMessage.ALL_RECIPIENTS)
return null;
- HashSet result = new HashSet();
- for (int i = 0; i < destinations.length; i++) {
- result.add(destinations[i]);
- }
+ HashSet<InternalDistributedMember> result = new HashSet<>();
+ Collections.addAll(result, destinations);
return result;
}
return membershipManager.send(destinations, content, stats);
@@ -3266,7 +3214,7 @@ public class ClusterDistributionManager implements
DistributionManager {
/**
* Schedule a given message appropriately, depending upon its executor kind.
*/
- void scheduleIncomingMessage(DistributionMessage message) {
+ private void scheduleIncomingMessage(DistributionMessage message) {
/*
* Potential race condition between starting up and getting other
distribution manager ids -- DM
* will only be initialized upto the point at which it called startThreads
@@ -3390,8 +3338,6 @@ public class ClusterDistributionManager implements
DistributionManager {
this.elderLock.unlock();
}
this.elderStateInitialized = true;
- // if (Thread.currentThread().isInterrupted())
- // throw new RuntimeException("Interrupted");
return this.elderState;
}
@@ -3444,8 +3390,6 @@ public class ClusterDistributionManager implements
DistributionManager {
InternalDistributedMember id, InternalDistributedMember
whoSuspected,
String reason) {}
- public void viewInstalled(NetView view) {}
-
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures,
@@ -3571,7 +3515,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
/** returns the serialThread's queue if throttling is being used, null if
not */
- public OverflowQueueWithDMStats getSerialQueue(InternalDistributedMember
sender) {
+ public OverflowQueueWithDMStats<Runnable>
getSerialQueue(InternalDistributedMember sender) {
if (MULTI_SERIAL_EXECUTORS) {
return this.serialQueuedExecutorPool.getSerialQueue(sender);
} else {
@@ -3632,7 +3576,8 @@ public class ClusterDistributionManager implements
DistributionManager {
}
/* -----------------------------Health Monitor------------------------------
*/
- private final ConcurrentMap hmMap = new ConcurrentHashMap();
+ private final ConcurrentMap<InternalDistributedMember, HealthMonitor> hmMap =
+ new ConcurrentHashMap<>();
private volatile InternalCache cache;
@@ -3646,7 +3591,7 @@ public class ClusterDistributionManager implements
DistributionManager {
*/
@Override
public HealthMonitor getHealthMonitor(InternalDistributedMember owner) {
- return (HealthMonitor) this.hmMap.get(owner);
+ return this.hmMap.get(owner);
}
/**
@@ -3707,11 +3652,10 @@ public class ClusterDistributionManager implements
DistributionManager {
@Override
public int getRoleCount(Role role) {
int count = 0;
- Set mbrs = getDistributionManagerIds();
- for (Iterator mbrIter = mbrs.iterator(); mbrIter.hasNext();) {
- Set roles = ((InternalDistributedMember) mbrIter.next()).getRoles();
- for (Iterator rolesIter = roles.iterator(); rolesIter.hasNext();) {
- Role mbrRole = (Role) rolesIter.next();
+ Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
+ for (InternalDistributedMember mbr : mbrs) {
+ Set<Role> roles = (mbr).getRoles();
+ for (Role mbrRole : roles) {
if (mbrRole.equals(role)) {
count++;
break;
@@ -3724,12 +3668,11 @@ public class ClusterDistributionManager implements
DistributionManager {
/** Returns true if at least one member is filling the specified role */
@Override
public boolean isRolePresent(Role role) {
- Set mbrs = getDistributionManagerIds();
- for (Iterator mbrIter = mbrs.iterator(); mbrIter.hasNext();) {
- Set roles = ((InternalDistributedMember) mbrIter.next()).getRoles();
- for (Iterator rolesIter = roles.iterator(); rolesIter.hasNext();) {
- Role mbrRole = (Role) rolesIter.next();
- if (mbrRole.equals(role)) {
+ Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
+ for (InternalDistributedMember mbr : mbrs) {
+ Set<Role> roles = mbr.getRoles();
+ for (Role mbrRole : roles) {
+ if ((mbrRole).equals(role)) {
return true;
}
}
@@ -3739,15 +3682,11 @@ public class ClusterDistributionManager implements
DistributionManager {
/** Returns a set of all roles currently in the distributed system. */
@Override
- public Set getAllRoles() {
- Set allRoles = new HashSet();
- Set mbrs = getDistributionManagerIds();
- for (Iterator mbrIter = mbrs.iterator(); mbrIter.hasNext();) {
- Set roles = ((InternalDistributedMember) mbrIter.next()).getRoles();
- for (Iterator rolesIter = roles.iterator(); rolesIter.hasNext();) {
- Role mbrRole = (Role) rolesIter.next();
- allRoles.add(mbrRole);
- }
+ public Set<Role> getAllRoles() {
+ Set<Role> allRoles = new HashSet<>();
+ Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
+ for (InternalDistributedMember mbr : mbrs) {
+ allRoles.addAll(mbr.getRoles());
}
return allRoles;
}
@@ -3778,22 +3717,24 @@ public class ClusterDistributionManager implements
DistributionManager {
*/
private static class SerialQueuedExecutorPool {
/** To store the serial threads */
- ConcurrentMap serialQueuedExecutorMap = new
ConcurrentHashMap(MAX_SERIAL_QUEUE_THREAD);
+ final ConcurrentMap<Integer, SerialQueuedExecutorWithDMStats>
serialQueuedExecutorMap =
+ new ConcurrentHashMap<>(MAX_SERIAL_QUEUE_THREAD);
/** To store the queue associated with thread */
- Map serialQueuedMap = new HashMap(MAX_SERIAL_QUEUE_THREAD);
+ final Map<Integer, OverflowQueueWithDMStats<Runnable>> serialQueuedMap =
+ new HashMap<>(MAX_SERIAL_QUEUE_THREAD);
/** Holds mapping between sender to the serial thread-id */
- Map senderToSerialQueueIdMap = new HashMap();
+ final Map<InternalDistributedMember, Integer> senderToSerialQueueIdMap =
new HashMap<>();
/**
* Holds info about unused thread, a thread is marked unused when the
member associated with it
* has left distribution system.
*/
- ArrayList threadMarkedForUse = new ArrayList();
+ final ArrayList<Integer> threadMarkedForUse = new ArrayList<>();
- DistributionStats stats;
- ThreadGroup threadGroup;
+ final DistributionStats stats;
+ final ThreadGroup threadGroup;
final boolean throttlingDisabled;
@@ -3824,7 +3765,7 @@ public class ClusterDistributionManager implements
DistributionManager {
synchronized (senderToSerialQueueIdMap) {
// Check if there is a executor associated with this sender.
- queueId = (Integer) senderToSerialQueueIdMap.get(sender);
+ queueId = senderToSerialQueueIdMap.get(sender);
if (!createNew || queueId != null) {
return queueId;
@@ -3833,7 +3774,7 @@ public class ClusterDistributionManager implements
DistributionManager {
// Create new.
// Check if any threads are availabe that is marked for Use.
if (!threadMarkedForUse.isEmpty()) {
- queueId = (Integer) threadMarkedForUse.remove(0);
+ queueId = threadMarkedForUse.remove(0);
}
// If Map is full, use the threads in round-robin fashion.
if (queueId == null) {
@@ -3848,12 +3789,12 @@ public class ClusterDistributionManager implements
DistributionManager {
* Returns the queue associated with this sender. Used in FlowControl for
throttling (based on
* queue size).
*/
- public OverflowQueueWithDMStats getSerialQueue(InternalDistributedMember
sender) {
+ OverflowQueueWithDMStats<Runnable>
getSerialQueue(InternalDistributedMember sender) {
Integer queueId = getQueueId(sender, false);
if (queueId == null) {
return null;
}
- return (OverflowQueueWithDMStats) serialQueuedMap.get(queueId);
+ return serialQueuedMap.get(queueId);
}
/*
@@ -3862,7 +3803,7 @@ public class ClusterDistributionManager implements
DistributionManager {
* applied during put event, this doesnt block the extract operation on
the queue.
*
*/
- public SerialQueuedExecutorWithDMStats getThrottledSerialExecutor(
+ SerialQueuedExecutorWithDMStats getThrottledSerialExecutor(
InternalDistributedMember sender) {
SerialQueuedExecutorWithDMStats executor = getSerialExecutor(sender);
@@ -3901,11 +3842,11 @@ public class ClusterDistributionManager implements
DistributionManager {
/*
* Returns the serial queue executor for the given sender.
*/
- public SerialQueuedExecutorWithDMStats
getSerialExecutor(InternalDistributedMember sender) {
+ SerialQueuedExecutorWithDMStats
getSerialExecutor(InternalDistributedMember sender) {
SerialQueuedExecutorWithDMStats executor = null;
Integer queueId = getQueueId(sender, true);
if ((executor =
- (SerialQueuedExecutorWithDMStats)
serialQueuedExecutorMap.get(queueId)) != null) {
+ serialQueuedExecutorMap.get(queueId)) != null) {
return executor;
}
// If executor doesn't exists for this sender, create one.
@@ -3927,12 +3868,12 @@ public class ClusterDistributionManager implements
DistributionManager {
*/
private SerialQueuedExecutorWithDMStats createSerialExecutor(final Integer
id) {
- BlockingQueue poolQueue;
+ OverflowQueueWithDMStats<Runnable> poolQueue;
if (SERIAL_QUEUE_BYTE_LIMIT == 0 || this.throttlingDisabled) {
- poolQueue = new OverflowQueueWithDMStats(stats.getSerialQueueHelper());
+ poolQueue = new
OverflowQueueWithDMStats<>(stats.getSerialQueueHelper());
} else {
- poolQueue = new
ThrottlingMemLinkedQueueWithDMStats(SERIAL_QUEUE_BYTE_LIMIT,
+ poolQueue = new
ThrottlingMemLinkedQueueWithDMStats<>(SERIAL_QUEUE_BYTE_LIMIT,
SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT,
SERIAL_QUEUE_SIZE_THROTTLE,
this.stats.getSerialQueueHelper());
}
@@ -4006,8 +3947,7 @@ public class ClusterDistributionManager implements
DistributionManager {
long timeNanos = unit.toNanos(time);
long remainingNanos = timeNanos;
long start = System.nanoTime();
- for (Iterator iter = serialQueuedExecutorMap.values().iterator();
iter.hasNext();) {
- ExecutorService executor = (ExecutorService) iter.next();
+ for (ExecutorService executor : serialQueuedExecutorMap.values()) {
executor.awaitTermination(remainingNanos, TimeUnit.NANOSECONDS);
remainingNanos = timeNanos = (System.nanoTime() - start);
if (remainingNanos <= 0) {
@@ -4017,32 +3957,14 @@ public class ClusterDistributionManager implements
DistributionManager {
}
private void shutdown() {
- for (Iterator iter = serialQueuedExecutorMap.values().iterator();
iter.hasNext();) {
- ExecutorService executor = (ExecutorService) iter.next();
+ for (ExecutorService executor : serialQueuedExecutorMap
+ .values()) {
executor.shutdown();
}
}
}
/**
- * A simple class used for locking the list of members of the distributed
system. We give this
- * lock its own class so that it shows up nicely in stack traces.
- */
- private static class MembersLock {
- protected MembersLock() {
-
- }
- }
-
- /**
- * A simple class used for locking the list of membership listeners. We give
this lock its own
- * class so that it shows up nicely in stack traces.
- */
- private static class MembershipListenersLock {
- protected MembershipListenersLock() {}
- }
-
- /**
* This is the listener implementation for responding from events from the
Membership Manager.
*
*/
@@ -4387,7 +4309,7 @@ public class ClusterDistributionManager implements
DistributionManager {
@Override
public Set<InternalDistributedMember> getMembersInSameZone(
InternalDistributedMember targetMember) {
- Set<InternalDistributedMember> buddyMembers = new
HashSet<InternalDistributedMember>();
+ Set<InternalDistributedMember> buddyMembers = new HashSet<>();
if (!redundancyZones.isEmpty()) {
synchronized (redundancyZones) {
String targetZone = redundancyZones.get(targetMember);
@@ -4399,7 +4321,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
} else {
buddyMembers.add(targetMember);
- Set targetAddrs = getEquivalents(targetMember.getInetAddress());
+ Set<InetAddress> targetAddrs =
getEquivalents(targetMember.getInetAddress());
for (Iterator i = getDistributionManagerIds().iterator(); i.hasNext();) {
InternalDistributedMember o = (InternalDistributedMember) i.next();
if (SetUtils.intersectsWith(targetAddrs,
getEquivalents(o.getInetAddress()))) {
@@ -4451,8 +4373,8 @@ public class ClusterDistributionManager implements
DistributionManager {
* attempt to use OSProcess native code for the dumps. This goes to stdout
instead of the
* system.log files.
*/
- public void printStacks(Collection ids, boolean useNative) {
- Set requiresMessage = new HashSet();
+ public void printStacks(Collection<InternalDistributedMember> ids, boolean
useNative) {
+ Set<InternalDistributedMember> requiresMessage = new HashSet<>();
if (ids.contains(localAddress)) {
OSProcess.printStacks(0, useNative);
}
@@ -4483,10 +4405,10 @@ public class ClusterDistributionManager implements
DistributionManager {
@Override
public Set<DistributedMember> getGroupMembers(String group) {
HashSet<DistributedMember> result = null;
- for (DistributedMember m : (Set<DistributedMember>)
getDistributionManagerIdsIncludingAdmin()) {
+ for (DistributedMember m : getDistributionManagerIdsIncludingAdmin()) {
if (m.getGroups().contains(group)) {
if (result == null) {
- result = new HashSet<DistributedMember>();
+ result = new HashSet<>();
}
result.add(m);
}
@@ -4499,11 +4421,11 @@ public class ClusterDistributionManager implements
DistributionManager {
}
@Override
- public Set getNormalDistributionManagerIds() {
+ public Set<InternalDistributedMember> getNormalDistributionManagerIds() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (this.membersLock) {
- HashSet<InternalDistributedMember> result = new
HashSet<InternalDistributedMember>();
+ HashSet<InternalDistributedMember> result = new HashSet<>();
for (InternalDistributedMember m : this.members.keySet()) {
if (m.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) {
result.add(m);
@@ -4518,7 +4440,7 @@ public class ClusterDistributionManager implements
DistributionManager {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (this.membersLock) {
- HashSet<InternalDistributedMember> result = new
HashSet<InternalDistributedMember>();
+ HashSet<InternalDistributedMember> result = new HashSet<>();
for (InternalDistributedMember m : this.members.keySet()) {
if (m.getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
result.add(m);
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 75d1a61..25fdf46 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -18,7 +18,6 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -59,7 +58,7 @@ public interface DistributionManager extends ReplySender {
*
* @since GemFire 5.7
*/
- Set getAllOtherMembers();
+ Set<InternalDistributedMember> getAllOtherMembers();
/**
* Returns the ID in the membership view that is equal to the argument. If
the ID is not in the
@@ -89,13 +88,13 @@ public interface DistributionManager extends ReplySender {
* Returns an unmodifiable set containing the identities of all of the known
distribution
* managers. As of 7.0 this includes locators since they have a cache.
*/
- Set getDistributionManagerIds();
+ Set<InternalDistributedMember> getDistributionManagerIds();
/**
* Returns an unmodifiable set containing the identities of all of the known
"normal" distribution
* managers. This does not include locators or admin members.
*/
- Set getNormalDistributionManagerIds();
+ Set<InternalDistributedMember> getNormalDistributionManagerIds();
/**
* Returns an unmodifiable set containing the identities of all of the known
distribution managers
@@ -103,22 +102,23 @@ public interface DistributionManager extends ReplySender {
*
* @since GemFire 5.7
*/
- Set getDistributionManagerIdsIncludingAdmin();
+ Set<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin();
/**
* Returns a private-memory list containing getDistributionManagerIds()
minus our id.
*/
- Set getOtherDistributionManagerIds();
+ Set<InternalDistributedMember> getOtherDistributionManagerIds();
/**
* Returns a private-memory list containing
getNormalDistributionManagerIds() minus our id.
*/
- Set getOtherNormalDistributionManagerIds();
+ Set<InternalDistributedMember> getOtherNormalDistributionManagerIds();
/**
* Add a membership listener and return other DistribtionManagerIds as an
atomic operation
*/
- Set addMembershipListenerAndGetDistributionManagerIds(MembershipListener l);
+ Set<InternalDistributedMember>
addMembershipListenerAndGetDistributionManagerIds(
+ MembershipListener l);
/**
* Add a membership listener for all members and return other
DistribtionManagerIds as an atomic
@@ -126,7 +126,7 @@ public interface DistributionManager extends ReplySender {
*
* @since GemFire 5.7
*/
- Set addAllMembershipListenerAndGetAllIds(MembershipListener l);
+ Set<InternalDistributedMember>
addAllMembershipListenerAndGetAllIds(MembershipListener l);
/**
* Returns the identity of this <code>DistributionManager</code>
@@ -187,7 +187,7 @@ public interface DistributionManager extends ReplySender {
*
* @return recipients who did not receive the message
*/
- Set putOutgoing(DistributionMessage msg);
+ Set<InternalDistributedMember> putOutgoing(DistributionMessage msg);
/**
* Returns the distributed system to which this distribution manager is
connected.
@@ -266,12 +266,6 @@ public interface DistributionManager extends ReplySender {
*/
Executor getFunctionExecutor();
- /**
- * gets this distribution manager's message-processing executor for ordered
(i.e. serialized)
- * message processing
- */
- // public Executor getSerialExecutor();
-
void close();
/**
@@ -281,16 +275,6 @@ public interface DistributionManager extends ReplySender {
List<InternalDistributedMember> getViewMembers();
/**
- * Returns the oldest member in the given set of distribution managers. The
current implementation
- * may use n*n/2 comparisons, so use this judiciously
- *
- * @return the oldest member of the given collection
- * @throws NoSuchElementException when none of the given members is actually
a member of the
- * distributed system.
- */
- DistributedMember getOldestMember(Collection members) throws
NoSuchElementException;
-
- /**
* @return Set of Admin VM nodes
*/
Set<InternalDistributedMember> getAdminMemberSet();
@@ -321,7 +305,7 @@ public interface DistributionManager extends ReplySender {
*/
void removeUnfinishedStartup(InternalDistributedMember m, boolean departed);
- void setUnfinishedStartups(Collection s);
+ void setUnfinishedStartups(Collection<InternalDistributedMember> s);
/**
* Return the CancelCriterion for this DM.
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
index 23b2c5e..c0de271 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
@@ -19,8 +19,6 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
@@ -29,6 +27,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.SystemFailure;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.deadlock.MessageDependencyMonitor;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
@@ -246,12 +245,12 @@ public abstract class DistributionMessage implements
DataSerializableFixedID, Cl
* Sets the intended recipient of the message. If recipient set contains
{@link #ALL_RECIPIENTS}
* then the message will be sent to all distribution managers.
*/
- public void setRecipients(Collection recipients) {
+ public void setRecipients(Collection<? extends DistributedMember>
recipients) {
if (this.recipients != null) {
throw new IllegalStateException(
LocalizedStrings.DistributionMessage_RECIPIENTS_CAN_ONLY_BE_SET_ONCE.toLocalizedString());
}
- this.recipients = (InternalDistributedMember[]) recipients
+ this.recipients = recipients
.toArray(new InternalDistributedMember[recipients.size()]);
}
@@ -260,14 +259,6 @@ public abstract class DistributionMessage implements
DataSerializableFixedID, Cl
this.multicast = false;
}
- public Set getSuccessfulRecipients() {
- // note we can't use getRecipients() for plannedRecipients because it will
- // return ALL_RECIPIENTS if multicast
- InternalDistributedMember[] plannedRecipients = this.recipients;
- Set successfulRecipients = new HashSet(Arrays.asList(plannedRecipients));
- return successfulRecipients;
- }
-
/**
* Returns the intended recipient(s) of this message. If the message is
intended to delivered to
* all distribution managers, then the array will contain ALL_RECIPIENTS. If
the recipients have
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index ebca008..c1e7c11 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -1649,7 +1649,7 @@ public class InternalDistributedSystem extends
DistributedSystem
@Override
public Set<DistributedMember> getAllOtherMembers() {
- return dm.getAllOtherMembers();
+ return (Set) dm.getAllOtherMembers();
}
@Override
@@ -1661,7 +1661,7 @@ public class InternalDistributedSystem extends
DistributedSystem
@Override
public Set<DistributedMember> findDistributedMembers(InetAddress address) {
Set<InternalDistributedMember> allMembers =
dm.getDistributionManagerIdsIncludingAdmin();
- Set<DistributedMember> results = new HashSet<DistributedMember>(2);
+ Set<DistributedMember> results = new HashSet<>(2);
// Search through the set of all members
for (InternalDistributedMember member : allMembers) {
@@ -1679,8 +1679,7 @@ public class InternalDistributedSystem extends
DistributedSystem
@Override
public DistributedMember findDistributedMember(String name) {
- Set<DistributedMember> allMembers =
dm.getDistributionManagerIdsIncludingAdmin();
- for (DistributedMember member : allMembers) {
+ for (DistributedMember member :
dm.getDistributionManagerIdsIncludingAdmin()) {
if (member.getName().equals(name)) {
return member;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index fc5dc08..6c9bb2c 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.distributed.internal;
-import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -24,7 +23,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
@@ -92,7 +90,7 @@ public class LonerDistributionManager implements
DistributionManager {
this.logger = logger;
this.localAddress = generateMemberId();
this.allIds = Collections.singleton(localAddress);
- this.viewMembers = new ArrayList<InternalDistributedMember>(allIds);
+ this.viewMembers = new ArrayList<>(allIds);
DistributionStats.enableClockStats =
this.system.getConfig().getEnableTimeStatistics();
Properties nonDefault = new Properties();
@@ -125,24 +123,10 @@ public class LonerDistributionManager implements
DistributionManager {
private final InternalDistributedMember localAddress;
- /*
- * static { // Make the id a little unique String host; try { host =
- * InetAddress.getLocalHost().getCanonicalHostName();
MemberAttributes.setDefaults(65535,
- * org.apache.geode.internal.OSProcess.getId(),
DistributionManager.LONER_DM_TYPE,
- * MemberAttributes.parseRoles(system.getConfig().getRoles())); id = new
- * InternalDistributedMember(host, 65535); // noise value for port number
- *
- * } catch (UnknownHostException ex) { throw new
InternalError(LocalizedStrings.
- *
LonerDistributionManager_CANNOT_RESOLVE_LOCAL_HOST_NAME_TO_AN_IP_ADDRESS.toLocalizedString());
- * }
- *
- * }
- */
-
private final Set<InternalDistributedMember> allIds;// =
Collections.singleton(id);
private final List<InternalDistributedMember> viewMembers;
private ConcurrentMap<InternalDistributedMember, InternalDistributedMember>
canonicalIds =
- new ConcurrentHashMap();
+ new ConcurrentHashMap<>();
private static final DummyDMStats stats = new DummyDMStats();
private final ExecutorService executor = Executors.newCachedThreadPool();
@@ -155,18 +139,14 @@ public class LonerDistributionManager implements
DistributionManager {
return localAddress;
}
- public Set getDistributionManagerIds() {
+ public Set<InternalDistributedMember> getDistributionManagerIds() {
return allIds;
}
- public Set getDistributionManagerIdsIncludingAdmin() {
+ public Set<InternalDistributedMember>
getDistributionManagerIdsIncludingAdmin() {
return allIds;
}
- public Serializable[] getDirectChannels(InternalDistributedMember[] ids) {
- return ids;
- }
-
public InternalDistributedMember getCanonicalId(DistributedMember dmid) {
InternalDistributedMember iid = (InternalDistributedMember) dmid;
InternalDistributedMember result = this.canonicalIds.putIfAbsent(iid, iid);
@@ -189,17 +169,17 @@ public class LonerDistributionManager implements
DistributionManager {
return null;
}
- public Set getOtherDistributionManagerIds() {
- return Collections.EMPTY_SET;
+ public Set<InternalDistributedMember> getOtherDistributionManagerIds() {
+ return Collections.emptySet();
}
@Override
- public Set getOtherNormalDistributionManagerIds() {
- return Collections.EMPTY_SET;
+ public Set<InternalDistributedMember> getOtherNormalDistributionManagerIds()
{
+ return Collections.emptySet();
}
- public Set getAllOtherMembers() {
- return Collections.EMPTY_SET;
+ public Set<InternalDistributedMember> getAllOtherMembers() {
+ return Collections.emptySet();
}
@Override // DM method
@@ -225,19 +205,16 @@ public class LonerDistributionManager implements
DistributionManager {
}
- public Set
addMembershipListenerAndGetDistributionManagerIds(MembershipListener l) {
+ public Set<InternalDistributedMember>
addMembershipListenerAndGetDistributionManagerIds(
+ MembershipListener l) {
// return getOtherDistributionManagerIds();
return allIds;
}
- public Set addAllMembershipListenerAndGetAllIds(MembershipListener l) {
+ public Set<InternalDistributedMember>
addAllMembershipListenerAndGetAllIds(MembershipListener l) {
return allIds;
}
- public int getDistributionManagerCount() {
- return 0;
- }
-
public InternalDistributedMember getId() {
return getDistributionManagerId();
}
@@ -270,17 +247,6 @@ public class LonerDistributionManager implements
DistributionManager {
return 0;
}
- public Set putOutgoingUserData(final DistributionMessage message) {
- if (message.forAll() || message.getRecipients().length == 0) {
- // do nothing
- return null;
- } else {
- throw new RuntimeException(
-
LocalizedStrings.LonerDistributionManager_LONER_TRIED_TO_SEND_MESSAGE_TO_0
- .toLocalizedString(message.getRecipientsDescription()));
- }
- }
-
public InternalDistributedSystem getSystem() {
return this.system;
}
@@ -337,41 +303,17 @@ public class LonerDistributionManager implements
DistributionManager {
return executor;
}
- public Map getChannelMap() {
- return null;
- }
-
- public Map getMemberMap() {
- return null;
- }
-
public void close() {
shutdown();
}
- public void restartCommunications() {
-
- }
-
@Override
public List<InternalDistributedMember> getViewMembers() {
return viewMembers;
}
- public DistributedMember getOldestMember(Collection members) throws
NoSuchElementException {
- if (members.size() == 1) {
- DistributedMember member = (DistributedMember) members.iterator().next();
- if (member.equals(viewMembers.get(0))) {
- return member;
- }
- }
- throw new NoSuchElementException(
-
LocalizedStrings.LonerDistributionManager_MEMBER_NOT_FOUND_IN_MEMBERSHIP_SET
- .toLocalizedString());
- }
-
- public Set getAdminMemberSet() {
- return Collections.EMPTY_SET;
+ public Set<InternalDistributedMember> getAdminMemberSet() {
+ return Collections.emptySet();
}
public static class DummyDMStats implements DMStats {
@@ -1282,7 +1224,7 @@ public class LonerDistributionManager implements
DistributionManager {
return getId().equals(p_id);
}
- public Set putOutgoing(DistributionMessage msg) {
+ public Set<InternalDistributedMember> putOutgoing(DistributionMessage msg) {
return null;
}
@@ -1292,7 +1234,7 @@ public class LonerDistributionManager implements
DistributionManager {
public void removeUnfinishedStartup(InternalDistributedMember m, boolean
departed) {}
- public void setUnfinishedStartups(Collection s) {}
+ public void setUnfinishedStartups(Collection<InternalDistributedMember> s) {}
protected static class Stopper extends CancelCriterion {
@@ -1388,7 +1330,7 @@ public class LonerDistributionManager implements
DistributionManager {
public Set<DistributedMember> getGroupMembers(String group) {
if (getDistributionManagerId().getGroups().contains(group)) {
- return Collections.singleton((DistributedMember)
getDistributionManagerId());
+ return Collections.singleton(getDistributionManagerId());
} else {
return Collections.emptySet();
}
@@ -1408,7 +1350,7 @@ public class LonerDistributionManager implements
DistributionManager {
}
@Override
- public Set getNormalDistributionManagerIds() {
+ public Set<InternalDistributedMember> getNormalDistributionManagerIds() {
return getDistributionManagerIds();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
index 8f57d89..4958928 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
* A LinkedBlockingQueue that supports stats. Named OverflowQueue for
historical reasons.
*
*/
-public class OverflowQueueWithDMStats<E> extends LinkedBlockingQueue {
+public class OverflowQueueWithDMStats<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = -1846248853494394996L;
protected final QueueStatHelper stats;
@@ -41,7 +41,7 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
}
@Override
- public boolean add(Object e) {
+ public boolean add(E e) {
preAdd(e);
if (super.add(e)) {
this.stats.add();
@@ -53,7 +53,7 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
}
@Override
- public boolean offer(Object e) {
+ public boolean offer(E e) {
preAdd(e);
if (super.offer(e)) {
this.stats.add();
@@ -65,7 +65,7 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
}
@Override
- public void put(Object e) throws InterruptedException {
+ public void put(E e) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
preAddInterruptibly(e);
@@ -82,7 +82,7 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
}
@Override
- public boolean offer(Object e, long timeout, TimeUnit unit) throws
InterruptedException {
+ public boolean offer(E e, long timeout, TimeUnit unit) throws
InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
preAddInterruptibly(e);
@@ -103,20 +103,20 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
}
@Override
- public Object take() throws InterruptedException {
+ public E take() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
- Object result = super.take();
+ E result = super.take();
postRemove(result);
this.stats.remove();
return result;
}
@Override
- public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
- Object result = super.poll(timeout, unit);
+ E result = super.poll(timeout, unit);
if (result != null) {
postRemove(result);
this.stats.remove();
@@ -136,7 +136,7 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
}
@Override
- public int drainTo(Collection c) {
+ public int drainTo(Collection<? super E> c) {
int result = super.drainTo(c);
if (result > 0) {
this.stats.remove(result);
@@ -146,7 +146,7 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
}
@Override
- public int drainTo(Collection c, int maxElements) {
+ public int drainTo(Collection<? super E> c, int maxElements) {
int result = super.drainTo(c, maxElements);
if (result > 0) {
this.stats.remove(result);
@@ -158,7 +158,7 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
/**
* Called before the specified object is added to this queue.
*/
- protected void preAddInterruptibly(Object o) throws InterruptedException {
+ protected void preAddInterruptibly(E o) throws InterruptedException {
// do nothing in this class. sub-classes can override
}
@@ -180,7 +180,7 @@ public class OverflowQueueWithDMStats<E> extends
LinkedBlockingQueue {
* Called after the specified collection of objects have been drained (i.e.
removed) from this
* queue.
*/
- protected void postDrain(Collection c) {
+ protected void postDrain(Collection<? super E> c) {
// do nothing in this class. sub-classes can override
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
index 6fa9d09..a245164 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
@@ -16,6 +16,7 @@ package org.apache.geode.distributed.internal;
import java.util.Set;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DirectReplyMessage;
/**
@@ -30,6 +31,6 @@ import org.apache.geode.internal.cache.DirectReplyMessage;
*/
public interface ReplySender {
- Set putOutgoing(DistributionMessage msg);
+ Set<InternalDistributedMember> putOutgoing(DistributionMessage msg);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupMessage.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupMessage.java
index fca3f4e..5cffb38 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupMessage.java
@@ -48,7 +48,7 @@ public class StartupMessage extends
HighPriorityDistributionMessage implements A
private int replyProcessorId;
private boolean isMcastEnabled;
private boolean isTcpDisabled;
- private Set interfaces;
+ private Set<InetAddress> interfaces;
private int distributedSystemId;
private String redundancyZone;
private boolean enforceUniqueZone;
@@ -66,13 +66,12 @@ public class StartupMessage extends
HighPriorityDistributionMessage implements A
* @return list of addresses for this host
* @since GemFire 5.7
*/
- public static Set getMyAddresses(ClusterDistributionManager dm) {
+ public static Set<InetAddress> getMyAddresses(ClusterDistributionManager dm)
{
try {
- Set addresses = SocketCreator.getMyAddresses();
- return addresses;
+ return SocketCreator.getMyAddresses();
} catch (IllegalArgumentException e) {
logger.fatal(e.getMessage(), e);
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -139,11 +138,6 @@ public class StartupMessage extends
HighPriorityDistributionMessage implements A
return true;
}
- // void setHostedLocatorsWithSharedConfiguration(Collection<String>
- // hostedLocatorsWithSharedConfiguration) {
- // this.hostedLocatorsWithSharedConfiguration =
hostedLocatorsWithSharedConfiguration;
- // }
-
/**
* Sets the tcpDisabled flag for this message
*
@@ -153,7 +147,7 @@ public class StartupMessage extends
HighPriorityDistributionMessage implements A
isTcpDisabled = flag;
}
- void setInterfaces(Set interfaces) {
+ void setInterfaces(Set<InetAddress> interfaces) {
this.interfaces = interfaces;
if (interfaces == null || interfaces.size() == 0) {
throw new SystemConnectException("Unable to examine network card");
@@ -415,7 +409,7 @@ public class StartupMessage extends
HighPriorityDistributionMessage implements A
}
} // for
- this.interfaces = (Set) DataSerializer.readObject(in);
+ this.interfaces = DataSerializer.readObject(in);
this.distributedSystemId = in.readInt();
this.redundancyZone = DataSerializer.readString(in);
this.enforceUniqueZone = in.readBoolean();
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupMessageReplyProcessor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupMessageReplyProcessor.java
index 6f10b49..dbf0904 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupMessageReplyProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupMessageReplyProcessor.java
@@ -61,12 +61,11 @@ public class StartupMessageReplyProcessor extends
ReplyProcessor21 {
/**
* Adds any unresponsive members to s
*/
- void collectUnresponsiveMembers(Set s) {
+ void collectUnresponsiveMembers(Set<InternalDistributedMember> s) {
if (stillWaiting()) {
InternalDistributedMember[] memberList = getMembers();
synchronized (memberList) {
- for (int i = 0; i < memberList.length; i++) {
- InternalDistributedMember m = memberList[i];
+ for (InternalDistributedMember m : memberList) {
if (m != null) {
s.add(m);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupOperation.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupOperation.java
index 2f5ce03..75c5dfb 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/StartupOperation.java
@@ -15,6 +15,7 @@
package org.apache.geode.distributed.internal;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@@ -48,7 +49,8 @@ public class StartupOperation {
* @return whether all recipients could be contacted. The failure set can be
fetched with
* getFailureSet??
*/
- boolean sendStartupMessage(Set recipients, long timeout, Set interfaces,
String redundancyZone,
+ boolean sendStartupMessage(Set recipients, long timeout, Set<InetAddress>
interfaces,
+ String redundancyZone,
boolean enforceUniqueZone)
throws InterruptedException, ReplyException,
java.net.UnknownHostException, IOException {
if (Thread.interrupted())
@@ -89,12 +91,12 @@ public class StartupOperation {
logger.debug("Waiting {} milliseconds to receive startup responses",
timeout);
}
boolean timedOut = true;
- Set unresponsive = null;
+ Set<InternalDistributedMember> unresponsive = null;
try {
timedOut = !proc.waitForReplies(timeout);
} finally {
if (timedOut) {
- unresponsive = new HashSet();
+ unresponsive = new HashSet<>();
proc.collectUnresponsiveMembers(unresponsive);
if (!unresponsive.isEmpty()) {
for (Iterator it = unresponsive.iterator(); it.hasNext();) {
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
index 8eace86..a7a8c21 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java
@@ -16,7 +16,7 @@
package org.apache.geode.distributed.internal;
import java.util.Collection;
-import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -34,7 +34,7 @@ import java.util.Iterator;
*
*/
-public class ThrottlingMemLinkedQueueWithDMStats extends
OverflowQueueWithDMStats {
+public class ThrottlingMemLinkedQueueWithDMStats<E> extends
OverflowQueueWithDMStats<E> {
private static final long serialVersionUID = 5425180246954573433L;
/** The maximum size of the queue */
@@ -50,7 +50,7 @@ public class ThrottlingMemLinkedQueueWithDMStats extends
OverflowQueueWithDMStat
private final int startThrottleSize;
/** The current memory footprint of the queue */
- private volatile int memSize;
+ private final AtomicInteger memSize = new AtomicInteger();
/** Creates a new instance of ThrottlingMessageQueue */
public ThrottlingMemLinkedQueueWithDMStats(int maxMemSize, int
startThrottleMemSize, int maxSize,
@@ -62,19 +62,10 @@ public class ThrottlingMemLinkedQueueWithDMStats extends
OverflowQueueWithDMStat
this.startThrottleSize = startThrottleSize;
}
- /** Check if the sender needs to be throttled. Returns the time the sender
should sleep */
- public int getThrottleTime() {
- return calculateThrottleTime();
- }
-
- public int getMemSize() {
- return memSize;
- }
-
private int calculateThrottleTime() {
int sleep;
- int myMemSize = memSize;
+ int myMemSize = memSize.get();
if (myMemSize > startThrottleMemSize) {
sleep = (int) (((float) (myMemSize - startThrottleMemSize)
/ (float) (maxMemSize - startThrottleMemSize)) * 100);
@@ -129,7 +120,7 @@ public class ThrottlingMemLinkedQueueWithDMStats extends
OverflowQueueWithDMStat
((ThrottledMemQueueStatHelper) this.stats).throttleTime(endTime -
startTime);
startTime = endTime;
}
- } while (memSize >= maxMemSize || size() >= maxSize);
+ } while (memSize.get() >= maxMemSize || size() >= maxSize);
((ThrottledMemQueueStatHelper) this.stats).incThrottleCount();
}
@@ -137,24 +128,23 @@ public class ThrottlingMemLinkedQueueWithDMStats extends
OverflowQueueWithDMStat
if (o instanceof Sizeable) {
int mem = ((Sizeable) o).getSize();
((ThrottledMemQueueStatHelper) this.stats).addMem(mem);
- this.memSize += mem;
+ this.memSize.addAndGet(mem);
}
}
@Override
protected void postRemove(Object o) {
- if (o != null && (o instanceof Sizeable)) {
+ if (o instanceof Sizeable) {
int mem = ((Sizeable) o).getSize();
- this.memSize -= mem;
+ this.memSize.addAndGet(-mem);
((ThrottledMemQueueStatHelper) this.stats).removeMem(mem);
}
}
@Override
protected void postDrain(Collection c) {
- Iterator it = c.iterator();
- while (it.hasNext()) {
- postRemove(it.next());
+ for (Object aC : c) {
+ postRemove(aC);
}
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
index 3e0b201..84a0bc3 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
@@ -71,21 +71,21 @@ public class DLockService extends DistributedLockService {
private static final Logger logger = LogService.getLogger();
- public static final long NOT_GRANTOR_SLEEP = Long
+ private static final long NOT_GRANTOR_SLEEP = Long
.getLong(DistributionConfig.GEMFIRE_PREFIX +
"DLockService.notGrantorSleep", 100).longValue();
- public static final boolean DEBUG_NONGRANTOR_DESTROY_LOOP = Boolean
+ private static final boolean DEBUG_NONGRANTOR_DESTROY_LOOP = Boolean
.getBoolean(DistributionConfig.GEMFIRE_PREFIX +
"DLockService.debug.nonGrantorDestroyLoop");
- public static final int DEBUG_NONGRANTOR_DESTROY_LOOP_COUNT = Integer
+ private static final int DEBUG_NONGRANTOR_DESTROY_LOOP_COUNT = Integer
.getInteger(
DistributionConfig.GEMFIRE_PREFIX +
"DLockService.debug.nonGrantorDestroyLoopCount", 20)
.intValue();
- public static final boolean AUTOMATE_FREE_RESOURCES =
+ private static final boolean AUTOMATE_FREE_RESOURCES =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX +
"DLockService.automateFreeResources");
- public static final int INVALID_LEASE_ID = -1;
+ static final int INVALID_LEASE_ID = -1;
/** Unique name for this instance of the named locking service */
protected final String serviceName;
@@ -929,10 +929,6 @@ public class DLockService extends DistributedLockService {
}
}
- public DLockGrantor getGrantorForElderRecovery() {
- return getGrantor();
- }
-
public DLockGrantor getGrantorWithNoSync() {
return this.grantor;
}
@@ -940,7 +936,7 @@ public class DLockService extends DistributedLockService {
/**
* @param predecessor non-null if a predecessor asked us to take over for him
*/
- void becomeLockGrantor(InternalDistributedMember predecessor) {
+ private void becomeLockGrantor(InternalDistributedMember predecessor) {
Assert.assertTrue(predecessor == null);
boolean ownLockGrantorFutureResult = false;
FutureResult lockGrantorFutureResultRef = null;
@@ -974,7 +970,6 @@ public class DLockService extends DistributedLockService {
}
if (!ownLockGrantorFutureResult) {
waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0,
TimeUnit.MILLISECONDS);
- continue;
}
}
@@ -2207,7 +2202,7 @@ public class DLockService extends DistributedLockService {
public static void destroyServiceNamed(String serviceName) throws
IllegalArgumentException {
DLockService svc = null;
synchronized (services) {
- svc = (DLockService) services.get(serviceName);
+ svc = services.get(serviceName);
}
if (svc == null) {
throw new IllegalArgumentException(
@@ -2219,12 +2214,11 @@ public class DLockService extends
DistributedLockService {
/** Destroys all lock services in this VM. Used in test tearDown code. */
public static void destroyAll() {
- Collection svcs = Collections.EMPTY_SET;
+ Collection<DLockService> svcs;
synchronized (services) {
- svcs = new HashSet(services.values());
+ svcs = new HashSet<>(services.values());
}
- for (Iterator iter = svcs.iterator(); iter.hasNext();) {
- DLockService svc = (DLockService) iter.next();
+ for (DLockService svc : svcs) {
try {
svc.destroyAndRemove();
} catch (CancelException e) {
@@ -2612,19 +2606,19 @@ public class DLockService extends
DistributedLockService {
private static final DummyDLockStats DUMMY_STATS = new DummyDLockStats();
- public static final SuspendLockingToken SUSPEND_LOCKING_TOKEN = new
SuspendLockingToken();
+ static final SuspendLockingToken SUSPEND_LOCKING_TOKEN = new
SuspendLockingToken();
// -------------------------------------------------------------------------
// Static fields
// -------------------------------------------------------------------------
- /** Map of all locking services. Key:ServiceName, Value:DLockService */
- protected static final Map<String, DLockService> services = new
HashMap<String, DLockService>();
+ /** Map of all locking services. */
+ protected static final Map<String, DLockService> services = new HashMap<>();
- protected static final Object creationLock = new Object();
+ private static final Object creationLock = new Object();
/** All DLock threads belong to this group */
- static ThreadGroup threadGroup;
+ private static ThreadGroup threadGroup;
/** DLock statistics; static because multiple dlock instances can exist */
private static DistributedLockStats stats = DUMMY_STATS;
@@ -2635,7 +2629,7 @@ public class DLockService extends DistributedLockService {
public static final String LTLS = "LTLS";
public static final String DTLS = "DTLS";
- static final String[] reservedNames = new String[] {LTLS, DTLS};
+ private static final String[] reservedNames = new String[] {LTLS, DTLS};
// -------------------------------------------------------------------------
// DLS serial number (uniquely identifies local instance of DLS)
@@ -2689,10 +2683,8 @@ public class DLockService extends DistributedLockService
{
* @see
org.apache.geode.distributed.DistributedLockService#getServiceNamed(String)
*/
public static DistributedLockService getServiceNamed(String serviceName) {
- DLockService svc = null;
synchronized (services) {
- svc = (DLockService) services.get(serviceName);
- return svc;
+ return services.get(serviceName);
}
}
@@ -2750,10 +2742,7 @@ public class DLockService extends DistributedLockService
{
throw new
IllegalArgumentException(LocalizedStrings.DLockService_SERVICE_NAMED_0_IS_NOT_VALID
.toLocalizedString(serviceName));
}
- DLockService svc = null;
- synchronized (services) {
- svc = (DLockService) services.get(serviceName);
- }
+ DLockService svc = getInternalServiceNamed(serviceName);
if (svc == null) {
throw new IllegalArgumentException(
LocalizedStrings.DLockService_SERVICE_NAMED_0_NOT_CREATED.toLocalizedString(serviceName));
@@ -2769,7 +2758,7 @@ public class DLockService extends DistributedLockService {
}
DLockService svc = null;
synchronized (services) {
- svc = (DLockService) services.get(serviceName);
+ svc = services.get(serviceName);
}
if (svc == null) {
throw new IllegalArgumentException(
@@ -2854,7 +2843,9 @@ public class DLockService extends DistributedLockService {
/** Convenience method to get named DLockService */
public static DLockService getInternalServiceNamed(String serviceName) {
- return (DLockService) services.get(serviceName);
+ synchronized (services) {
+ return services.get(serviceName);
+ }
}
/** Validates service name for external creation */
@@ -2864,8 +2855,8 @@ public class DLockService extends DistributedLockService {
LocalizedStrings.DLockService_LOCK_SERVICE_NAME_MUST_NOT_BE_NULL_OR_EMPTY
.toLocalizedString());
}
- for (int i = 0; i < reservedNames.length; i++) {
- if (serviceName.startsWith(reservedNames[i])) {
+ for (String reservedName : reservedNames) {
+ if (serviceName.startsWith(reservedName)) {
throw new IllegalArgumentException(
LocalizedStrings.DLockService_SERVICE_NAMED_0_IS_RESERVED_FOR_INTERNAL_USE_ONLY
.toLocalizedString(serviceName));
@@ -2875,11 +2866,9 @@ public class DLockService extends DistributedLockService
{
/** Return a snapshot of all services */
public static Map<String, DLockService> snapshotAllServices() { // used by:
internal/admin/remote
- Map snapshot = null;
synchronized (services) {
- snapshot = new HashMap(services);
+ return new HashMap<>(services);
}
- return snapshot;
}
/**
@@ -2891,9 +2880,7 @@ public class DLockService extends DistributedLockService {
synchronized (services) {
logger.info(LogMarker.DLS_MARKER,
LocalizedMessage.create(LocalizedStrings.TESTING,
"DLockService.dumpAllServices() - " + services.size() + "
services:\n"));
- Iterator entries = services.entrySet().iterator();
- while (entries.hasNext()) {
- Map.Entry entry = (Map.Entry) entries.next();
+ for (Map.Entry entry : services.entrySet()) {
buffer.append(" " + entry.getKey() + ":\n");
DLockService svc = (DLockService) entry.getValue();
svc.dumpService();
@@ -2954,7 +2941,7 @@ public class DLockService extends DistributedLockService {
InternalDistributedSystem system = null;
synchronized (services) {
- DLockService removedService = (DLockService)
services.remove(service.getName());
+ DLockService removedService = services.remove(service.getName());
if (removedService == null) {
// another thread beat us to the removal... return
return;
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java
index f6e8ce8..788593e 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java
@@ -279,11 +279,11 @@ public class GrantorRequestProcessor extends
ReplyProcessor21 {
* @param sys th distributed system
* @return information describing the current grantor of this service and if
recovery is needed
*/
- public static GrantorInfo peekGrantor(DLockService service,
InternalDistributedSystem sys) {
+ static GrantorInfo peekGrantor(DLockService service,
InternalDistributedSystem sys) {
return basicOp(-1, service, -1, sys, null, PEEK_OP);
}
- public static GrantorInfo peekGrantor(String serviceName,
InternalDistributedSystem sys) {
+ static GrantorInfo peekGrantor(String serviceName, InternalDistributedSystem
sys) {
return basicOp(-1, serviceName, null, -1, sys, null, PEEK_OP);
}
@@ -296,7 +296,7 @@ public class GrantorRequestProcessor extends
ReplyProcessor21 {
* @return information describing the previous grantor, if any, and if we
need to do a grantor
* recovery
*/
- public static GrantorInfo becomeGrantor(DLockService service, int
dlsSerialNumber,
+ static GrantorInfo becomeGrantor(DLockService service, int dlsSerialNumber,
InternalDistributedMember oldTurk, InternalDistributedSystem sys) {
return basicOp(-1, service, dlsSerialNumber, sys, oldTurk, BECOME_OP);
}
@@ -307,7 +307,7 @@ public class GrantorRequestProcessor extends
ReplyProcessor21 {
* @param service the service we are no longer the grantor of.
* @param sys the distributed system
*/
- public static void clearGrantor(long grantorVersion, DLockService service,
int dlsSerialNumber,
+ static void clearGrantor(long grantorVersion, DLockService service, int
dlsSerialNumber,
InternalDistributedSystem sys, boolean withLocks) {
basicOp(grantorVersion, service, dlsSerialNumber, sys, null,
withLocks ? CLEAR_WITH_LOCKS_OP : CLEAR_OP);
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
index 7c227ed..a91f757 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
@@ -135,7 +135,8 @@ public interface MembershipManager {
* not receive the message because they departed the distributed
system.
* @throws NotSerializableException If content cannot be serialized
*/
- Set send(InternalDistributedMember[] destinations, DistributionMessage
content, DMStats stats)
+ Set<InternalDistributedMember> send(InternalDistributedMember[] destinations,
+ DistributionMessage content, DMStats stats)
throws NotSerializableException;
/**
@@ -163,7 +164,7 @@ public interface MembershipManager {
* distributed member
* @since GemFire 5.1
*/
- Map getMessageState(DistributedMember member, boolean includeMulticast);
+ Map<String, Long> getMessageState(DistributedMember member, boolean
includeMulticast);
/**
* Waits for the given communications to reach the associated state
@@ -173,7 +174,8 @@ public interface MembershipManager {
* @throws InterruptedException Thrown if the thread is interrupted
* @since GemFire 5.1
*/
- void waitForMessageState(DistributedMember member, Map state) throws
InterruptedException;
+ void waitForMessageState(DistributedMember member, Map<String, Long> state)
+ throws InterruptedException;
/**
* Wait for the given member to not be in the membership view and for all
direct-channel receivers
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Messenger.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Messenger.java
index b981d30..080f0da 100755
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Messenger.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -72,7 +72,8 @@ public interface Messenger extends Service {
* @param state messaging state is stored in this map
* @param includeMulticast whether to record multicast state
*/
- void getMessageState(InternalDistributedMember member, Map state, boolean
includeMulticast);
+ void getMessageState(InternalDistributedMember member, Map<String, Long>
state,
+ boolean includeMulticast);
/**
* The flip-side of getMessageState, this method takes the state it recorded
and waits for
@@ -81,7 +82,8 @@ public interface Messenger extends Service {
* @param member the member flushing operations to this member
* @param state the state of that member's outgoing messaging to this member
*/
- void waitForMessageState(InternalDistributedMember member, Map state) throws
InterruptedException;
+ void waitForMessageState(InternalDistributedMember member, Map<String, Long>
state)
+ throws InterruptedException;
/**
* Get the public key of member.
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 83cbe71..71ca91e 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -550,21 +550,21 @@ public class JGroupsMessenger implements Messenger {
}
@Override
- public void getMessageState(InternalDistributedMember target, Map state,
+ public void getMessageState(InternalDistributedMember target, Map<String,
Long> state,
boolean includeMulticast) {
if (includeMulticast) {
NAKACK2 nakack = (NAKACK2)
myChannel.getProtocolStack().findProtocol("NAKACK2");
if (nakack != null) {
long seqno = nakack.getCurrentSeqno();
- state.put("JGroups.mcastState", Long.valueOf(seqno));
+ state.put("JGroups.mcastState", seqno);
}
}
}
@Override
- public void waitForMessageState(InternalDistributedMember sender, Map state)
+ public void waitForMessageState(InternalDistributedMember sender,
Map<String, Long> state)
throws InterruptedException {
- Long seqno = (Long) state.get("JGroups.mcastState");
+ Long seqno = state.get("JGroups.mcastState");
if (seqno == null) {
return;
}
@@ -590,7 +590,7 @@ public class JGroupsMessenger implements Messenger {
"waiting for multicast messages from {}. Current seqno={} and
expected seqno={}",
sender, highSeqno, seqno);
}
- if (highSeqno >= seqno.longValue()) {
+ if (highSeqno >= seqno) {
break;
}
long now = System.currentTimeMillis();
@@ -1096,7 +1096,7 @@ public class JGroupsMessenger implements Messenger {
throws ClassNotFoundException, IOException {
GMSMember m = new GMSMember();
m.readEssentialData(in);
- DistributionMessage result = (DistributionMessage)
DataSerializer.readObject(in);
+ DistributionMessage result = DataSerializer.readObject(in);
setSender(result, m, ordinal);
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index de2fce2..1dc6573 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.distributed.internal.membership.gms.mgr;
-import java.io.IOException;
import java.io.NotSerializableException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -46,7 +45,6 @@ import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemConnectException;
import org.apache.geode.SystemFailure;
import org.apache.geode.ToDataException;
-import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -265,7 +263,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
/**
* Membership failure listeners - for testing
*/
- private List membershipTestHooks;
+ private List<MembershipTestHook> membershipTestHooks;
/**
* This is a representation of the local member (ourself)
@@ -1584,7 +1582,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
try {
logger.info("generating XML to rebuild the cache after reconnect
completes");
StringPrintWriter pw = new StringPrintWriter();
- CacheXmlGenerator.generate((Cache) cache, pw, true, false);
+ CacheXmlGenerator.generate(cache, pw, true, false);
String cacheXML = pw.toString();
cache.getCacheConfig().setCacheXMLDescription(cacheXML);
logger.info("XML generation completed: {}", cacheXML);
@@ -1683,7 +1681,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
* @return all recipients who did not receive the message (null if all
received it)
* @throws NotSerializableException if the message is not serializable
*/
- protected Set<InternalDistributedMember> directChannelSend(
+ Set<InternalDistributedMember> directChannelSend(
InternalDistributedMember[] destinations, DistributionMessage content,
DMStats theStats)
throws NotSerializableException {
boolean allDestinations;
@@ -1740,7 +1738,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
return null;
// We need to return this list of failures
- List<InternalDistributedMember> members =
(List<InternalDistributedMember>) ex.getMembers();
+ List<InternalDistributedMember> members = ex.getMembers();
// SANITY CHECK: If we fail to send a message to an existing member
// of the view, we have a serious error (bug36202).
@@ -1767,14 +1765,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
} // catch ConnectionExceptions
catch (ToDataException | CancelException e) {
throw e;
- } catch (IOException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Membership: directChannelSend caught exception: {}",
e.getMessage(), e);
- }
- if (e instanceof NotSerializableException) {
- throw (NotSerializableException) e;
- }
- } catch (RuntimeException | Error e) {
+ } catch (NotSerializableException | RuntimeException | Error e) {
if (logger.isDebugEnabled()) {
logger.debug("Membership: directChannelSend caught exception: {}",
e.getMessage(), e);
}
@@ -2193,7 +2184,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
/**
* for mock testing this allows insertion of a DirectChannel mock
*/
- protected void setDirectChannel(DirectChannel dc) {
+ void setDirectChannel(DirectChannel dc) {
this.directChannel = dc;
this.tcpDisabled = false;
}
@@ -2201,8 +2192,8 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
/*
* non-thread-owned serial channels and high priority channels are not
included
*/
- public Map getMessageState(DistributedMember member, boolean
includeMulticast) {
- Map result = new HashMap();
+ public Map<String, Long> getMessageState(DistributedMember member, boolean
includeMulticast) {
+ Map<String, Long> result = new HashMap<>();
DirectChannel dc = directChannel;
if (dc != null) {
dc.getChannelStates(member, result);
@@ -2212,7 +2203,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
return result;
}
- public void waitForMessageState(DistributedMember otherMember, Map state)
+ public void waitForMessageState(DistributedMember otherMember, Map<String,
Long> state)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
@@ -2305,12 +2296,12 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
/**
* wait for serial executor messages from the given member to be processed
*/
- public boolean waitForSerialMessageProcessing(InternalDistributedMember idm)
+ private boolean waitForSerialMessageProcessing(InternalDistributedMember idm)
throws InterruptedException {
// run a message through the member's serial execution queue to ensure
that all of its
// current messages have been processed
boolean result = false;
- OverflowQueueWithDMStats serialQueue =
listener.getDM().getSerialQueue(idm);
+ OverflowQueueWithDMStats<Runnable> serialQueue =
listener.getDM().getSerialQueue(idm);
if (serialQueue != null) {
final boolean done[] = new boolean[1];
final FlushingMessage msg = new FlushingMessage(done);
@@ -2389,7 +2380,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
if (this.membershipTestHooks == null) {
this.membershipTestHooks = Collections.singletonList(mth);
} else {
- List l = new ArrayList(this.membershipTestHooks);
+ List<MembershipTestHook> l = new ArrayList<>(this.membershipTestHooks);
l.add(mth);
this.membershipTestHooks = l;
}
@@ -2405,7 +2396,7 @@ public class GMSMembershipManager implements
MembershipManager, Manager {
if (this.membershipTestHooks.size() == 1) {
this.membershipTestHooks = null;
} else {
- List l = new ArrayList(this.membershipTestHooks);
+ List<MembershipTestHook> l = new
ArrayList<>(this.membershipTestHooks);
l.remove(mth);
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 1f60b23..ae2d925 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2707,7 +2707,7 @@ public class GemFireCacheImpl implements InternalCache,
InternalClientCache, Has
@Override
public Set<DistributedMember> getMembers() {
return Collections
- .unmodifiableSet((Set<DistributedMember>)
this.dm.getOtherNormalDistributionManagerIds());
+ .unmodifiableSet((Set) this.dm.getOtherNormalDistributionManagerIds());
}
@Override
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectExceptions.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectExceptions.java
index 54f3212..7eb9e0b 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectExceptions.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectExceptions.java
@@ -15,7 +15,6 @@
package org.apache.geode.internal.tcp;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.geode.GemFireCheckedException;
@@ -34,10 +33,10 @@ public class ConnectExceptions extends
GemFireCheckedException {
private static final long serialVersionUID = -4173688946448867706L;
/** The causes of this exception */
- private List causes;
+ private List<Throwable> causes;
/** The InternalDistributedMember's of the members we couldn't connect/send
to */
- private List members;
+ private List<InternalDistributedMember> members;
//////////////////// Constructors ////////////////////
@@ -47,8 +46,8 @@ public class ConnectExceptions extends
GemFireCheckedException {
*/
public ConnectExceptions() {
super(LocalizedStrings.ConnectException_COULD_NOT_CONNECT.toLocalizedString());
- this.causes = new ArrayList();
- this.members = new ArrayList();
+ this.causes = new ArrayList<>();
+ this.members = new ArrayList<>();
}
@@ -63,26 +62,26 @@ public class ConnectExceptions extends
GemFireCheckedException {
/**
* Returns a list of <code>InternalDistributedMember</code>s that couldn't
be connected to.
*/
- public List getMembers() {
+ public List<InternalDistributedMember> getMembers() {
return this.members;
}
/**
* Returns the causes of this exception
*/
- public List getCauses() {
+ public List<Throwable> getCauses() {
return this.causes;
}
@Override
public String getMessage() {
StringBuffer sb = new StringBuffer();
- for (Iterator iter = this.members.iterator(); iter.hasNext();) {
- sb.append(' ').append(iter.next());
+ for (InternalDistributedMember member : this.members) {
+ sb.append(' ').append(member);
}
sb.append("
").append(LocalizedStrings.ConnectException_CAUSES.toLocalizedString());
- for (Iterator iter = this.causes.iterator(); iter.hasNext();) {
- sb.append(" {").append(iter.next()).append("}");
+ for (Throwable cause : this.causes) {
+ sb.append(" {").append(cause).append("}");
}
return
LocalizedStrings.ConnectException_COULD_NOT_CONNECT_TO_0.toLocalizedString(sb);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index ae89e53..8a2f82c 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -27,6 +27,7 @@ import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionMessage;
import
org.apache.geode.distributed.internal.LonerDistributionManager.DummyDMStats;
import org.apache.geode.distributed.internal.ReplySender;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -49,7 +50,7 @@ class DirectReplySender implements ReplySender {
this.conn = connection;
}
- public Set putOutgoing(DistributionMessage msg) {
+ public Set<InternalDistributedMember> putOutgoing(DistributionMessage msg) {
Assert.assertTrue(!this.sentReply, "Trying to reply twice to a message");
// Using an ArrayList, rather than Collections.singletonList here, because
the MsgStreamer
// mutates the list when it has exceptions.
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index fde2cd0..81748ca 100755
---
a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -16,7 +16,6 @@ package org.apache.geode.management.internal;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -261,18 +260,11 @@ public class FederatingManager extends Manager {
public void startManagingActivity() throws Exception {
final boolean isDebugEnabled = logger.isDebugEnabled();
- Set<DistributedMember> members =
- cache.getDistributionManager().getOtherDistributionManagerIds();
-
- Iterator<DistributedMember> it = members.iterator();
- DistributedMember member;
-
final List<Callable<DistributedMember>> giiTaskList = new ArrayList<>();
-
List<Future<DistributedMember>> futureTaskList;
- while (it.hasNext()) {
- member = it.next();
+ for (DistributedMember member : cache.getDistributionManager()
+ .getOtherDistributionManagerIds()) {
giiTaskList.add(new GIITask(member));
}
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/MemberMessenger.java
b/geode-core/src/main/java/org/apache/geode/management/internal/MemberMessenger.java
index 59b88df..0f84bf9 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/MemberMessenger.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/MemberMessenger.java
@@ -59,7 +59,8 @@ public class MemberMessenger {
}
public void broadcastManagerInfo() {
- Set<DistributedMember> otherMemberSet =
system.getDistributionManager().getAllOtherMembers();
+ Set<InternalDistributedMember> otherMemberSet =
+ system.getDistributionManager().getAllOtherMembers();
String levelName = jmxAdapter.getDistributedSystemMXBean().getAlertLevel();
int alertCode = LogLevel.getLogWriterLevel(levelName);
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/messages/CompactRequest.java
b/geode-core/src/main/java/org/apache/geode/management/internal/messages/CompactRequest.java
index 5ddd258..52948eb 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/messages/CompactRequest.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/messages/CompactRequest.java
@@ -57,7 +57,7 @@ public class CompactRequest extends AdminRequest {
private static String notExecutedMembers;
public static Map<DistributedMember, PersistentID> send(DistributionManager
dm,
- String diskStoreName, Set<?> recipients) {
+ String diskStoreName, Set<? extends DistributedMember> recipients) {
Map<DistributedMember, PersistentID> results = Collections.emptyMap();
if (recipients != null && !recipients.isEmpty()) {
diff --git
a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java
b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java
index 0415661..06f9a34 100644
---
a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java
+++
b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java
@@ -140,7 +140,7 @@ public class LuceneManagementDUnitTest extends
ManagementTestBase {
}
private static void waitForMemberProxiesToRefresh(int refreshCount, final
InternalCache cache) {
- Set<DistributedMember> members =
+ Set<? extends DistributedMember> members =
cache.getDistributionManager().getOtherNormalDistributionManagerIds();
// Currently, the LuceneServiceMBean is not updated in the manager since
it has no getters,
// so use the MemberMBean instead.
@@ -165,7 +165,7 @@ public class LuceneManagementDUnitTest extends
ManagementTestBase {
}
private static void verifyMBeanProxies(final InternalCache cache) {
- Set<DistributedMember> members =
+ Set<? extends DistributedMember> members =
cache.getDistributionManager().getOtherNormalDistributionManagerIds();
for (DistributedMember member : members) {
getMBeanProxy(member);
@@ -204,7 +204,7 @@ public class LuceneManagementDUnitTest extends
ManagementTestBase {
private static void verifyAllMBeanProxyIndexMetrics(String regionName, int
numRegionIndexes,
int numTotalIndexes, final InternalCache cache) {
- Set<DistributedMember> members =
+ Set<? extends DistributedMember> members =
cache.getDistributionManager().getOtherNormalDistributionManagerIds();
for (DistributedMember member : members) {
LuceneServiceMXBean mbean = getMBeanProxy(member);
@@ -240,7 +240,7 @@ public class LuceneManagementDUnitTest extends
ManagementTestBase {
private void verifyMBeanIndexMetricsValues(String regionName, String
indexName, int expectedPuts,
int expectedQueries, int expectedHits, final InternalCache cache) {
// Get index metrics from all members
- Set<DistributedMember> members =
+ Set<? extends DistributedMember> members =
cache.getDistributionManager().getOtherNormalDistributionManagerIds();
int totalCommits = 0, totalUpdates = 0, totalDocuments = 0, totalQueries =
0, totalHits = 0;
for (DistributedMember member : members) {
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java
index a96004d..0d24130 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java
@@ -92,7 +92,7 @@ public class GatewayReceiverMBeanDUnitTest extends
ManagementTestBase {
}
private static void verifyMBeanProxies(final InternalCache cache) {
- Set<DistributedMember> members =
+ Set<? extends DistributedMember> members =
cache.getDistributionManager().getOtherNormalDistributionManagerIds();
for (DistributedMember member : members) {
Awaitility.await().atMost(60, TimeUnit.SECONDS)
@@ -101,7 +101,7 @@ public class GatewayReceiverMBeanDUnitTest extends
ManagementTestBase {
}
private static void verifyMBeanProxiesDoesNotExist(final InternalCache
cache) {
- Set<DistributedMember> members =
+ Set<? extends DistributedMember> members =
cache.getDistributionManager().getOtherNormalDistributionManagerIds();
for (DistributedMember member : members) {
assertNull(getMBeanProxy(member));