This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 8d44f027ec1cb491d700b19ca3e30c5c474aee90
Author: Jacob Barrett <[email protected]>
AuthorDate: Sat Jan 16 08:58:28 2021 -0800

    GEODE-6588: Static analyzer cleanup.
---
 .../client/internal/ClientSideHandshakeImpl.java   |   2 +-
 .../cache/tier/sockets/CacheClientNotifier.java    | 110 +++++++++---------
 .../cache/tier/sockets/ClientHealthMonitor.java    |  31 ++---
 .../cache/tier/sockets/MessageDispatcher.java      | 127 ++++++++++-----------
 .../cache/tier/sockets/SocketMessageWriter.java    |  16 +--
 5 files changed, 137 insertions(+), 149 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
index eb5fcf9..f0de522 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
@@ -169,7 +169,7 @@ public class ClientSideHandshakeImpl extends Handshake 
implements ClientSideHand
       // if running in a loner system, use the new port number in the ID to
       // help differentiate from other clients
       DistributionManager dm = ((InternalDistributedSystem) 
system).getDistributionManager();
-      InternalDistributedMember idm = dm.getDistributionManagerId();
+      final InternalDistributedMember idm = dm.getDistributionManagerId();
       synchronized (idm) {
         if (idm.getMembershipPort() == 0 && dm instanceof 
LonerDistributionManager) {
           int port = sock.getLocalPort();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index a3a67a3..7f4fe13 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.cache.Region.SEPARATOR;
 import static 
org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP;
 import static 
org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
@@ -71,6 +72,7 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.ClassLoadUtil;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.CacheClientStatus;
@@ -480,8 +482,7 @@ public class CacheClientNotifier {
         removeClientInitProxy(l_proxy);
       }
     }
-    boolean status = false;
-    return status;
+    return false;
   }
 
   /**
@@ -620,7 +621,7 @@ public class CacheClientNotifier {
   }
 
   private boolean hasClientProxies() {
-    return !this._initClientProxies.isEmpty() || 
!this._clientProxies.isEmpty();
+    return !_initClientProxies.isEmpty() || !_clientProxies.isEmpty();
   }
 
   /**
@@ -745,7 +746,8 @@ public class CacheClientNotifier {
 
     // Add interestList info.
     if (filterInfo.getInterestedClientsInv() != null) {
-      Set<Object> rawIDs = 
regionProfile.getRealClientIDs(filterInfo.getInterestedClientsInv());
+      Set<Object> rawIDs =
+          
uncheckedCast(regionProfile.getRealClientIDs(filterInfo.getInterestedClientsInv()));
       Set<ClientProxyMembershipID> ids = getProxyIDs(rawIDs);
       incMessagesNotQueuedOriginatorStat(event, ids);
       if (!ids.isEmpty()) {
@@ -757,7 +759,8 @@ public class CacheClientNotifier {
       }
     }
     if (filterInfo.getInterestedClients() != null) {
-      Set<Object> rawIDs = 
regionProfile.getRealClientIDs(filterInfo.getInterestedClients());
+      Set<Object> rawIDs =
+          
uncheckedCast(regionProfile.getRealClientIDs(filterInfo.getInterestedClients()));
       Set<ClientProxyMembershipID> ids = getProxyIDs(rawIDs);
       incMessagesNotQueuedOriginatorStat(event, ids);
       if (!ids.isEmpty()) {
@@ -801,7 +804,7 @@ public class CacheClientNotifier {
       FilterInfo filterInfo) {
     FilterProfile regionProfile = ((InternalRegion) 
event.getRegion()).getFilterProfile();
     if (event.getOperation().isEntry() && filterInfo.getCQs() != null) {
-      EntryEvent entryEvent = (EntryEvent) event;
+      EntryEvent<?, ?> entryEvent = (EntryEvent<?, ?>) event;
       for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
         Long cqID = e.getKey();
         String cqName = regionProfile.getRealCqID(cqID);
@@ -926,7 +929,7 @@ public class CacheClientNotifier {
    * processes the given collection of durable and non-durable client 
identifiers, returning a
    * collection of non-durable identifiers of clients connected to this VM
    */
-  Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+  Set<ClientProxyMembershipID> getProxyIDs(Set<?> 
mixedDurableAndNonDurableIDs) {
     Set<ClientProxyMembershipID> result = ConcurrentHashMap.newKeySet();
     for (Object id : mixedDurableAndNonDurableIDs) {
       if (id instanceof String) {
@@ -961,7 +964,7 @@ public class CacheClientNotifier {
 
         CacheDistributionAdvisor advisor =
             proxy.getHARegionQueue().getRegion().getCacheDistributionAdvisor();
-        Set members = advisor.adviseCacheOp();
+        Set<InternalDistributedMember> members = advisor.adviseCacheOp();
 
         // Send client denylist message
         ClientDenylistProcessor.sendDenylistedClient(proxy.getProxyID(), dm, 
members);
@@ -991,7 +994,8 @@ public class CacheClientNotifier {
    * @param event The event containing the data to be updated
    * @return a {@code ClientUpdateMessage}
    */
-  private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent 
operation, CacheEvent event)
+  private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent 
operation,
+      CacheEvent<?, ?> event)
       throws Exception {
     if (!supportsOperation(operation)) {
       throw new Exception(
@@ -1004,7 +1008,7 @@ public class CacheClientNotifier {
     boolean isNetLoad = false;
     Object callbackArgument;
     byte[] delta = null;
-    VersionTag versionTag = null;
+    VersionTag<?> versionTag = null;
 
     if (event.getOperation().isEntry()) {
       EntryEventImpl entryEvent = (EntryEventImpl) event;
@@ -1111,7 +1115,8 @@ public class CacheClientNotifier {
    * @param regionDataPolicy (0==empty)
    * @since GemFire 6.1
    */
-  public void updateMapOfEmptyRegions(Map regionsWithEmptyDataPolicy, String 
regionName,
+  public void updateMapOfEmptyRegions(Map<String, Integer> 
regionsWithEmptyDataPolicy,
+      String regionName,
       int regionDataPolicy) {
     if (regionDataPolicy == 0) {
       if (!regionsWithEmptyDataPolicy.containsKey(regionName)) {
@@ -1151,7 +1156,7 @@ public class CacheClientNotifier {
    * @param membershipID The {@code ClientProxyMembershipID} of the client no 
longer interested
    *        in this {@code Region} and key
    */
-  public void registerClientInterest(String regionName, List keysOfInterest,
+  public void registerClientInterest(String regionName, List<?> keysOfInterest,
       ClientProxyMembershipID membershipID, boolean isDurable, boolean 
sendUpdatesAsInvalidates,
       boolean manageEmptyRegions, int regionDataPolicy, boolean flushState)
       throws IOException, RegionDestroyedException {
@@ -1184,7 +1189,7 @@ public class CacheClientNotifier {
    * @param membershipID The {@code ClientProxyMembershipID} of the client no 
longer interested
    *        in this {@code Region} and key
    */
-  public void unregisterClientInterest(String regionName, List keysOfInterest, 
boolean isClosing,
+  public void unregisterClientInterest(String regionName, List<?> 
keysOfInterest, boolean isClosing,
       ClientProxyMembershipID membershipID, boolean keepalive) {
     if (logger.isDebugEnabled()) {
       logger.debug("CacheClientNotifier: Client {} unregistering interest in: 
{} -> {}",
@@ -1203,7 +1208,7 @@ public class CacheClientNotifier {
    * @return the {@code CacheClientProxy} associated to the membershipID
    */
   public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID) 
{
-    return (CacheClientProxy) _clientProxies.get(membershipID);
+    return _clientProxies.get(membershipID);
   }
 
   /**
@@ -1214,7 +1219,7 @@ public class CacheClientNotifier {
       boolean proxyInInitMode) {
     CacheClientProxy proxy = getClientProxy(membershipID);
     if (proxyInInitMode && proxy == null) {
-      proxy = (CacheClientProxy) _initClientProxies.get(membershipID);
+      proxy = _initClientProxies.get(membershipID);
     }
     return proxy;
   }
@@ -1286,10 +1291,10 @@ public class CacheClientNotifier {
           getCache().getCacheServers().size());
     }
 
-    Iterator it = _clientProxies.values().iterator();
+    Iterator<CacheClientProxy> it = _clientProxies.values().iterator();
     // Close all the client proxies
     while (it.hasNext()) {
-      CacheClientProxy proxy = (CacheClientProxy) it.next();
+      CacheClientProxy proxy = it.next();
       if (proxy.getAcceptorId() != acceptorId) {
         continue;
       }
@@ -1299,9 +1304,9 @@ public class CacheClientNotifier {
           logger.debug("CacheClientNotifier: Closing {}", proxy);
         }
         proxy.terminateDispatching(true);
-      } catch (Exception ignore) {
+      } catch (Exception e) {
         if (isDebugEnabled) {
-          logger.debug("{}: Exception in closing down the CacheClientProxy", 
this, ignore);
+          logger.debug("{}: Exception in closing down the CacheClientProxy", 
this, e);
         }
       }
     }
@@ -1355,7 +1360,7 @@ public class CacheClientNotifier {
        * ClientHealthMonitor.getInstance() might return null.
        */
       if (chm != null) {
-        
chm.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal());
+        chm.numberOfClientsWithConflationOff.incrementAndGet();
       }
     }
     timedOutDurableClientProxies.remove(proxy.getProxyID());
@@ -1379,8 +1384,8 @@ public class CacheClientNotifier {
    *
    * @return set of memberIds
    */
-  public Set getActiveClients() {
-    Set clients = new HashSet();
+  public Set<ClientProxyMembershipID> getActiveClients() {
+    Set<ClientProxyMembershipID> clients = new HashSet<>();
     for (CacheClientProxy proxy : getClientProxies()) {
       if (proxy.hasRegisteredInterested()) {
         ClientProxyMembershipID proxyID = proxy.getProxyID();
@@ -1395,8 +1400,8 @@ public class CacheClientNotifier {
    *
    * @return Map, with CacheClientProxy as a key and CacheClientStatus as a 
value
    */
-  public Map getAllClients() {
-    Map clients = new HashMap();
+  public Map<ClientProxyMembershipID, CacheClientStatus> getAllClients() {
+    Map<ClientProxyMembershipID, CacheClientStatus> clients = new HashMap<>();
     for (Object o : _clientProxies.values()) {
       CacheClientProxy proxy = (CacheClientProxy) o;
       ClientProxyMembershipID proxyID = proxy.getProxyID();
@@ -1448,8 +1453,8 @@ public class CacheClientNotifier {
    *
    * @return map with CacheClientProxy as key, and Integer as a value
    */
-  public Map getClientQueueSizes() {
-    Map/* <ClientProxyMembershipID,Integer> */ queueSizes = new HashMap();
+  public Map<ClientProxyMembershipID, Integer> getClientQueueSizes() {
+    Map<ClientProxyMembershipID, Integer> queueSizes = new HashMap<>();
     for (Object o : _clientProxies.values()) {
       CacheClientProxy proxy = (CacheClientProxy) o;
       queueSizes.put(proxy.getProxyID(), proxy.getQueueSize());
@@ -1488,7 +1493,7 @@ public class CacheClientNotifier {
     if (!(proxy.clientConflation == Handshake.CONFLATION_ON)) {
       ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
       if (chm != null) {
-        
chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal());
+        chm.numberOfClientsWithConflationOff.decrementAndGet();
       }
     }
   }
@@ -1555,22 +1560,22 @@ public class CacheClientNotifier {
    *
    * @param deadProxies The list of {@code CacheClientProxy} instances to close
    */
-  private void closeDeadProxies(List deadProxies, boolean stoppedNormally) {
+  private void closeDeadProxies(List<CacheClientProxy> deadProxies, boolean 
stoppedNormally) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    for (Object deadProxy : deadProxies) {
-      CacheClientProxy proxy = (CacheClientProxy) deadProxy;
+    for (CacheClientProxy deadProxy : deadProxies) {
       if (isDebugEnabled) {
-        logger.debug("CacheClientNotifier: Closing dead client: {}", proxy);
+        logger.debug("CacheClientNotifier: Closing dead client: {}", 
deadProxy);
       }
 
       // Close the proxy
       boolean keepProxy = false;
       try {
-        keepProxy = proxy.close(false, stoppedNormally);
+        keepProxy = deadProxy.close(false, stoppedNormally);
       } catch (CancelException e) {
         throw e;
       } catch (Exception e) {
-        logger.warn("CacheClientNotifier: Caught exception attempting to close 
client: {}", proxy,
+        logger.warn("CacheClientNotifier: Caught exception attempting to close 
client: {}",
+            deadProxy,
             e);
       }
 
@@ -1579,15 +1584,16 @@ public class CacheClientNotifier {
       if (keepProxy) {
         logger.info(
             "CacheClientNotifier: Keeping proxy for durable client named {} 
for {} seconds {}.",
-            proxy.getDurableId(), proxy.getDurableTimeout(), proxy);
+            deadProxy.getDurableId(), deadProxy.getDurableTimeout(), 
deadProxy);
       } else {
-        closeAllClientCqs(proxy);
+        closeAllClientCqs(deadProxy);
         if (isDebugEnabled) {
-          logger.debug("CacheClientNotifier: Not keeping proxy for non-durable 
client: {}", proxy);
+          logger.debug("CacheClientNotifier: Not keeping proxy for non-durable 
client: {}",
+              deadProxy);
         }
-        removeClientProxy(proxy);
+        removeClientProxy(deadProxy);
       }
-      proxy.notifyRemoval();
+      deadProxy.notifyRemoval();
     } // for
   }
 
@@ -1624,7 +1630,7 @@ public class CacheClientNotifier {
    *
    * @since GemFire 5.8Beta
    */
-  public Set getInterestRegistrationListeners() {
+  public Set<InterestRegistrationListener> getInterestRegistrationListeners() {
     return readableInterestRegistrationListeners;
   }
 
@@ -1740,7 +1746,7 @@ public class CacheClientNotifier {
     statistics = new CacheClientNotifierStats(factory);
 
     try {
-      logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
+      logFrequency = 
Long.parseLong(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
       if (logFrequency <= 0) {
         logFrequency = DEFAULT_LOG_FREQUENCY;
       }
@@ -1899,18 +1905,17 @@ public class CacheClientNotifier {
    * reconnects. To make sure you get the updated ClientProxyMembershipID use 
this map to lookup the
    * CacheClientProxy and then call getProxyID on it.
    */
-  private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ 
_clientProxies =
-      new ConcurrentHashMap();
+  private final ConcurrentMap<ClientProxyMembershipID, CacheClientProxy> 
_clientProxies =
+      new ConcurrentHashMap<>();
 
   /**
    * The map of {@code CacheClientProxy} instances which are getting 
initialized. Maps
    * ClientProxyMembershipID to CacheClientProxy.
    */
-  private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ 
_initClientProxies =
-      new ConcurrentHashMap();
+  private final ConcurrentMap<ClientProxyMembershipID, CacheClientProxy> 
_initClientProxies =
+      new ConcurrentHashMap<>();
 
-  private final Set<ClientProxyMembershipID> timedOutDurableClientProxies =
-      new HashSet<>();
+  private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = 
new HashSet<>();
 
   /**
    * The GemFire {@code InternalCache}. Note that since this is a singleton 
class you should
@@ -1966,13 +1971,14 @@ public class CacheClientNotifier {
    * The {@code InterestRegistrationListener} instances registered in this VM. 
This is used
    * when modifying the set of listeners.
    */
-  private final Set writableInterestRegistrationListeners = new 
CopyOnWriteArraySet();
+  private final Set<InterestRegistrationListener> 
writableInterestRegistrationListeners =
+      new CopyOnWriteArraySet<>();
 
   /**
    * The {@code InterestRegistrationListener} instances registered in this VM. 
This is used to
    * provide a read-only {@code Set} of listeners.
    */
-  private final Set readableInterestRegistrationListeners =
+  private final Set<InterestRegistrationListener> 
readableInterestRegistrationListeners =
       Collections.unmodifiableSet(writableInterestRegistrationListeners);
 
   /**
@@ -2032,7 +2038,7 @@ public class CacheClientNotifier {
   /**
    * @return the haContainer
    */
-  public Map getHaContainer() {
+  public Map<?, ?> getHaContainer() {
     return haContainer;
   }
 
@@ -2050,7 +2056,7 @@ public class CacheClientNotifier {
                   : overflowAttributes.getOverflowDirectory(),
               overflowAttributes.isDiskStore())));
     } else {
-      haContainer = new HAContainerMap(new ConcurrentHashMap());
+      haContainer = new HAContainerMap(new ConcurrentHashMap<>());
     }
     assert haContainer != null;
 
@@ -2059,7 +2065,7 @@ public class CacheClientNotifier {
     }
   }
 
-  private final Set denyListedClients = new CopyOnWriteArraySet();
+  private final Set<ClientProxyMembershipID> denyListedClients = new 
CopyOnWriteArraySet<>();
 
   void addToDenylistedClient(ClientProxyMembershipID proxyID) {
     denyListedClients.add(proxyID);
@@ -2069,7 +2075,7 @@ public class CacheClientNotifier {
         TimeUnit.SECONDS);
   }
 
-  Set getDenylistedClient() {
+  Set<ClientProxyMembershipID> getDenylistedClient() {
     return denyListedClients;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index 70a1897..0d709f8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -47,7 +47,6 @@ import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
 import org.apache.geode.internal.lang.JavaWorkarounds;
-import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.logging.internal.executors.LoggingThread;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
@@ -66,13 +65,13 @@ public class ClientHealthMonitor {
   /**
    * The map of known clients and last time seen.
    */
-  private ConcurrentMap<ClientProxyMembershipID, AtomicLong> clientHeartbeats =
+  private final ConcurrentMap<ClientProxyMembershipID, AtomicLong> 
clientHeartbeats =
       new ConcurrentHashMap<>();
 
   /**
    * The map of known clients and maximum time between pings.
    */
-  private ConcurrentMap<ClientProxyMembershipID, Integer> 
clientMaximumTimeBetweenPings =
+  private final ConcurrentMap<ClientProxyMembershipID, Integer> 
clientMaximumTimeBetweenPings =
       new ConcurrentHashMap<>();
 
   /**
@@ -84,7 +83,7 @@ public class ClientHealthMonitor {
     return maximumTimeBetweenPings;
   }
 
-  private volatile int maximumTimeBetweenPings;
+  private final int maximumTimeBetweenPings;
 
   /**
    * A thread that validates client connections
@@ -128,22 +127,18 @@ public class ClientHealthMonitor {
       new HashMap<>();
 
   /**
-   * Gives, version-wise, the number of clients connected to the cache servers 
in this cache, which
+   * Gives the number of clients connected to the cache servers in this cache, 
which
    * are capable of processing received deltas.
    *
-   * NOTE: It does not necessarily give the actual number of clients per 
version connected to the
-   * cache servers in this cache.
-   *
    * @see CacheClientNotifier#addClientProxy(CacheClientProxy)
    */
-  AtomicIntegerArray numOfClientsPerVersion =
-      new AtomicIntegerArray(KnownVersion.HIGHEST_VERSION + 1);
+  AtomicInteger numberOfClientsWithConflationOff = new AtomicInteger(0);
 
   public long getMonitorInterval() {
     return monitorInterval;
   }
 
-  private long monitorInterval;
+  private final long monitorInterval;
 
   /**
    * Factory method to construct or return the singleton 
<code>ClientHealthMonitor</code> instance.
@@ -380,7 +375,7 @@ public class ClientHealthMonitor {
    *        ConnectionProxies may be from same client member or different. If 
it is null this would
    *        mean to fetch the Connections of all the ConnectionProxy objects.
    */
-  public Map<String, Object[]> getConnectedClients(Set filterProxies) {
+  public Map<String, Object[]> 
getConnectedClients(Set<ClientProxyMembershipID> filterProxies) {
     Map<String, Object[]> map = new HashMap<>(); // KEY=proxyID, 
VALUE=connectionCount (Integer)
     synchronized (proxyIdConnections) {
       for (Map.Entry<ClientProxyMembershipID, ServerConnectionCollection> 
entry : proxyIdConnections
@@ -665,16 +660,8 @@ public class ClientHealthMonitor {
     return cleanupTable;
   }
 
-  private int getNumberOfClientsAtOrAboveVersion(KnownVersion version) {
-    int number = 0;
-    for (int i = version.ordinal(); i < numOfClientsPerVersion.length(); i++) {
-      number += numOfClientsPerVersion.get(i);
-    }
-    return number;
-  }
-
   public boolean hasDeltaClients() {
-    return getNumberOfClientsAtOrAboveVersion(KnownVersion.OLDEST) > 0;
+    return numberOfClientsWithConflationOff.get() > 0;
   }
 
   private int getMaximumTimeBetweenPings(ClientProxyMembershipID proxyID) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index c1c9cf3..7157e55 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -14,11 +14,12 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+
 import java.io.IOException;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -123,7 +124,7 @@ public class MessageDispatcher extends LoggingThread {
       StatisticsClock statisticsClock) throws CacheException {
     super(name);
 
-    this._proxy = proxy;
+    _proxy = proxy;
 
     // Create the event conflator
     // this._eventConflator = new BridgeEventConflator
@@ -138,24 +139,22 @@ public class MessageDispatcher extends LoggingThread {
       boolean createDurableQueue = proxy.proxyID.isDurable();
       boolean canHandleDelta =
           
InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
-              && !(this._proxy.clientConflation == Handshake.CONFLATION_ON);
+              && !(_proxy.clientConflation == Handshake.CONFLATION_ON);
       if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) {
         logger.debug("Creating a {} subscription queue for {}",
             createDurableQueue ? "durable" : "non-durable",
             proxy.getProxyID());
       }
-      this._messageQueue = 
HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(),
+      _messageQueue = 
HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(),
           getCache(), harq, HARegionQueue.BLOCKING_HA_QUEUE, 
createDurableQueue,
           proxy._cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
-          this._proxy.clientConflation, this._proxy.isPrimary(), 
canHandleDelta, statisticsClock);
+          _proxy.clientConflation, _proxy.isPrimary(), canHandleDelta, 
statisticsClock);
       // Check if interests were registered during HARegion GII.
-      if (this._proxy.hasRegisteredInterested()) {
-        this._messageQueue.setHasRegisteredInterest(true);
+      if (_proxy.hasRegisteredInterested()) {
+        _messageQueue.setHasRegisteredInterest(true);
       }
-    } catch (CancelException e) {
+    } catch (CancelException | RegionExistsException e) {
       throw e;
-    } catch (RegionExistsException ree) {
-      throw ree;
     } catch (Exception e) {
       getCache().getCancelCriterion().checkCancelInProgress(e);
       throw new CacheException(
@@ -167,7 +166,7 @@ public class MessageDispatcher extends LoggingThread {
   }
 
   private CacheClientProxy getProxy() {
-    return this._proxy;
+    return _proxy;
   }
 
   private InternalCache getCache() {
@@ -190,7 +189,7 @@ public class MessageDispatcher extends LoggingThread {
     if (logger.isDebugEnabled()) {
       logger.debug("{}: notified dispatcher to stop", this);
     }
-    this._isStopped = true;
+    _isStopped = true;
     // this.interrupt(); // don't interrupt here. Let close(boolean) do this.
   }
 
@@ -221,13 +220,12 @@ public class MessageDispatcher extends LoggingThread {
     }
 
     // Stay alive until the queue is empty or a number of peeks is reached.
-    List events = null;
     try {
       for (int numberOfPeeks =
           0; numberOfPeeks < CacheClientProxy.MAXIMUM_SHUTDOWN_PEEKS; 
++numberOfPeeks) {
         boolean interrupted = Thread.interrupted();
         try {
-          events = this._messageQueue.peek(1, -1);
+          List<?> events = _messageQueue.peek(1, -1);
           if (events == null || events.size() == 0) {
             break;
           }
@@ -259,7 +257,7 @@ public class MessageDispatcher extends LoggingThread {
    * @return whether the dispatcher is stopped
    */
   protected boolean isStopped() {
-    return this._isStopped;
+    return _isStopped;
   }
 
   /**
@@ -269,7 +267,7 @@ public class MessageDispatcher extends LoggingThread {
    * @return the size of the queue
    */
   protected int getQueueSize() {
-    return this._messageQueue == null ? 0 : this._messageQueue.size();
+    return _messageQueue == null ? 0 : _messageQueue.size();
   }
 
   /**
@@ -279,8 +277,8 @@ public class MessageDispatcher extends LoggingThread {
    * @return the size of the queue
    */
   protected int getQueueSizeStat() {
-    if (this._messageQueue != null) {
-      HARegionQueueStats stats = this._messageQueue.getStatistics();
+    if (_messageQueue != null) {
+      HARegionQueueStats stats = _messageQueue.getStatistics();
       return ((int) (stats.getEventsEnqued() - stats.getEventsRemoved()
           - stats.getEventsConflated() - stats.getMarkerEventsConflated()
           - stats.getEventsExpired() - stats.getEventsRemovedByQrm() - 
stats.getEventsTaken()
@@ -291,7 +289,7 @@ public class MessageDispatcher extends LoggingThread {
 
   protected void drainClientCqEvents(ClientProxyMembershipID clientId,
       InternalCqQuery cqToClose) {
-    this._messageQueue.closeClientCq(clientId, cqToClose);
+    _messageQueue.closeClientCq(clientId, cqToClose);
   }
 
   @Override
@@ -328,7 +326,7 @@ public class MessageDispatcher extends LoggingThread {
   @VisibleForTesting
   protected void runDispatcher() {
     boolean exceptionOccurred = false;
-    this._isStopped = false;
+    _isStopped = false;
 
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Beginning to process events", this);
@@ -337,7 +335,7 @@ public class MessageDispatcher extends LoggingThread {
     ClientMessage clientMessage = null;
     while (!isStopped()) {
       // SystemFailure.checkFailure(); DM's stopper does this
-      if (this._proxy._cache.getCancelCriterion().isCancelInProgress()) {
+      if (_proxy._cache.getCancelCriterion().isCancelInProgress()) {
         break;
       }
       try {
@@ -348,12 +346,12 @@ public class MessageDispatcher extends LoggingThread {
           // reconnecting.
           synchronized (_pausedLock) {
             try {
-              logger.info("available ids = " + this._messageQueue.size() + " , 
isEmptyAckList ="
-                  + this._messageQueue.isEmptyAckList() + ", peekInitialized = 
"
-                  + this._messageQueue.isPeekInitialized());
-              while (!this._messageQueue.isEmptyAckList()
-                  && this._messageQueue.isPeekInitialized()) {
-                this._messageQueue.remove();
+              logger.info("available ids = " + _messageQueue.size() + " , 
isEmptyAckList ="
+                  + _messageQueue.isEmptyAckList() + ", peekInitialized = "
+                  + _messageQueue.isPeekInitialized());
+              while (!_messageQueue.isEmptyAckList()
+                  && _messageQueue.isPeekInitialized()) {
+                _messageQueue.remove();
               }
             } catch (InterruptedException ex) {
               logger.warn("{}: sleep interrupted.", this);
@@ -362,11 +360,11 @@ public class MessageDispatcher extends LoggingThread {
           waitForResumption();
         }
         try {
-          clientMessage = (ClientMessage) this._messageQueue.peek();
+          clientMessage = (ClientMessage) _messageQueue.peek();
         } catch (RegionDestroyedException skipped) {
           break;
         }
-        getStatistics().setQueueSize(this._messageQueue.size());
+        getStatistics().setQueueSize(_messageQueue.size());
         if (isStopped()) {
           break;
         }
@@ -377,13 +375,13 @@ public class MessageDispatcher extends LoggingThread {
           boolean isDispatched = dispatchMessage(clientMessage);
           getStatistics().endMessage(start);
           if (isDispatched) {
-            this._messageQueue.remove();
+            _messageQueue.remove();
             if (clientMessage instanceof ClientMarkerMessageImpl) {
               getProxy().setMarkerEnqueued(false);
             }
           }
         } else {
-          this._messageQueue.remove();
+          _messageQueue.remove();
         }
         clientMessage = null;
       } catch (MessageTooLargeException e) {
@@ -391,7 +389,7 @@ public class MessageDispatcher extends LoggingThread {
       } catch (IOException e) {
         // Added the synchronization below to ensure that exception handling
         // does not occur while stopping the dispatcher and vice versa.
-        synchronized (this._stopDispatchingLock) {
+        synchronized (_stopDispatchingLock) {
           // An IOException occurred while sending a message to the
           // client. If the processor is not already stopped, assume
           // the client is dead and stop processing.
@@ -467,13 +465,13 @@ public class MessageDispatcher extends LoggingThread {
     }
 
     // Processing gets here if isStopped=true. What is this code below doing?
-    List list = null;
     if (!exceptionOccurred) {
+      List<ClientMessage> list = null;
       try {
         // Clear the interrupt status if any,
         Thread.interrupted();
-        int size = this._messageQueue.size();
-        list = this._messageQueue.peek(size);
+        int size = _messageQueue.size();
+        list = uncheckedCast(_messageQueue.peek(size));
         if (logger.isDebugEnabled()) {
           logger.debug(
               "{}: After flagging the dispatcher to stop , the residual List 
of messages to be dispatched={} size={}",
@@ -481,31 +479,29 @@ public class MessageDispatcher extends LoggingThread {
         }
         if (list.size() > 0) {
           long start = getStatistics().startTime();
-          Iterator itr = list.iterator();
-          while (itr.hasNext()) {
-            dispatchMessage((ClientMessage) itr.next());
+          for (final ClientMessage o : list) {
+            dispatchMessage(o);
             getStatistics().endMessage(start);
             // @todo asif: shouldn't we call itr.remove() since the current msg
             // has been sent? That way list will be more accurate
             // if we have an exception.
           }
-          this._messageQueue.remove();
+          _messageQueue.remove();
         }
       } catch (CancelException e) {
         if (logger.isDebugEnabled()) {
           logger.debug("CacheClientNotifier stopped due to cancellation");
         }
-      } catch (Exception ignore) {
+      } catch (Exception e) {
         // if (logger.isInfoEnabled()) {
         String extraMsg = null;
 
-        if ("Broken pipe".equals(ignore.getMessage())) {
+        if ("Broken pipe".equals(e.getMessage())) {
           extraMsg = "Problem caused by broken pipe on socket.";
-        } else if (ignore instanceof RegionDestroyedException) {
-          extraMsg =
-              "Problem caused by message queue being closed.";
+        } else if (e instanceof RegionDestroyedException) {
+          extraMsg = "Problem caused by message queue being closed.";
         }
-        final Object[] msgArgs = new Object[] {((!isStopped()) ? 
this.toString() + ": " : ""),
+        final Object[] msgArgs = new Object[] {((!isStopped()) ? toString() + 
": " : ""),
             ((list == null) ? 0 : list.size())};
         if (extraMsg != null) {
           // Dont print exception details, but add on extraMsg
@@ -519,7 +515,7 @@ public class MessageDispatcher extends LoggingThread {
           logger.info(String.format(
               "%s Possibility of not being able to send some or all of the 
messages to clients. Total messages currently present in the list %s.",
               msgArgs),
-              ignore);
+              e);
         }
       }
 
@@ -547,7 +543,7 @@ public class MessageDispatcher extends LoggingThread {
         }
       }
     } else {
-      this._isStopped = true;
+      _isStopped = true;
     }
 
     // Stop the ServerConnections. This will force the client to
@@ -579,11 +575,8 @@ public class MessageDispatcher extends LoggingThread {
     if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
       logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "Dispatching {}", 
clientMessage);
     }
-    Message message = null;
-
-    // byte[] latestValue =
-    // this._eventConflator.getLatestValue(clientMessage);
 
+    final Message message;
     if (clientMessage instanceof ClientUpdateMessage) {
       byte[] latestValue = (byte[]) ((ClientUpdateMessage) 
clientMessage).getValue();
       if (logger.isTraceEnabled()) {
@@ -608,7 +601,7 @@ public class MessageDispatcher extends LoggingThread {
       message = clientMessage.getMessage(getProxy(), true /* notify */);
     }
 
-    if (!this._proxy.isPaused()) {
+    if (!_proxy.isPaused()) {
       sendMessage(message);
 
       if (logger.isTraceEnabled()) {
@@ -621,7 +614,7 @@ public class MessageDispatcher extends LoggingThread {
       }
     }
     if (isDispatched) {
-      this._messageQueue.getStatistics().incEventsDispatched();
+      _messageQueue.getStatistics().incEventsDispatched();
     }
     return isDispatched;
   }
@@ -630,13 +623,13 @@ public class MessageDispatcher extends LoggingThread {
     if (message == null) {
       return;
     }
-    this.socketWriteLock.lock();
+    socketWriteLock.lock();
     try {
       message.setComms(getSocket(), getCommBuffer(), getStatistics());
       message.send();
       getProxy().resetPingCounter();
     } finally {
-      this.socketWriteLock.unlock();
+      socketWriteLock.unlock();
     }
     if (logger.isTraceEnabled()) {
       logger.trace("{}: Sent {}", this, message);
@@ -650,9 +643,9 @@ public class MessageDispatcher extends LoggingThread {
    */
   protected void enqueueMessage(Conflatable clientMessage) {
     try {
-      this._messageQueue.put(clientMessage);
-      if (this._proxy.isPaused() && this._proxy.isDurable()) {
-        
this._proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount();
+      _messageQueue.put(clientMessage);
+      if (_proxy.isPaused() && _proxy.isDurable()) {
+        
_proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount();
         if (logger.isDebugEnabled()) {
           logger.debug("{}: Queued message while Durable Client is away {}", 
this, clientMessage);
         }
@@ -661,7 +654,7 @@ public class MessageDispatcher extends LoggingThread {
       throw e;
     } catch (Exception e) {
       if (!isStopped()) {
-        this._proxy._statistics.incMessagesFailedQueued();
+        _proxy._statistics.incMessagesFailedQueued();
         logger.fatal(
             String.format("%s: Exception occurred while attempting to add 
message to queue",
                 this),
@@ -677,7 +670,7 @@ public class MessageDispatcher extends LoggingThread {
         logger.debug("{}: Queueing marker message. <{}>. The queue contains {} 
entries.", this,
             message, getQueueSize());
       }
-      this._messageQueue.put(message);
+      _messageQueue.put(message);
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Queued marker message. The queue contains {} 
entries.", this,
             getQueueSize());
@@ -711,7 +704,7 @@ public class MessageDispatcher extends LoggingThread {
       logger.warn("Message too large to send to client: {}, {}", 
clientMessage, e.getMessage());
 
     } catch (IOException e) {
-      synchronized (this._stopDispatchingLock) {
+      synchronized (_stopDispatchingLock) {
         // Pause or unregister proxy
         if (!isStopped() && !getProxy().isPaused()) {
           logger.fatal(String.format("%s : An unexpected Exception occurred", 
this),
@@ -727,13 +720,13 @@ public class MessageDispatcher extends LoggingThread {
   }
 
   protected void waitForResumption() throws InterruptedException {
-    synchronized (this._pausedLock) {
+    synchronized (_pausedLock) {
       logger.info("{} : Pausing processing", this);
       if (!getProxy().isPaused()) {
         return;
       }
       while (getProxy().isPaused()) {
-        this._pausedLock.wait();
+        _pausedLock.wait();
       }
       // Fix for #48571
       _messageQueue.clearPeekedIDs();
@@ -744,7 +737,7 @@ public class MessageDispatcher extends LoggingThread {
     logger.info("{} : Resuming processing", this);
 
     // Notify thread to resume
-    this._pausedLock.notifyAll();
+    _pausedLock.notifyAll();
   }
 
   protected Object deserialize(byte[] serializedBytes) {
@@ -759,13 +752,13 @@ public class MessageDispatcher extends LoggingThread {
   }
 
   protected void initializeTransients() {
-    while (!this._messageQueue.isEmptyAckList() && 
this._messageQueue.isPeekInitialized()) {
+    while (!_messageQueue.isEmptyAckList() && 
_messageQueue.isPeekInitialized()) {
       try {
-        this._messageQueue.remove();
+        _messageQueue.remove();
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
     }
-    this._messageQueue.initializeTransients();
+    _messageQueue.initializeTransients();
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java
index 89bdc8d..0a4ef53 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java
@@ -19,6 +19,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.Instantiator;
@@ -48,10 +50,10 @@ public class SocketMessageWriter {
 
     // get all the instantiators.
     Instantiator[] instantiators = InternalInstantiator.getInstantiators();
-    HashMap instantiatorMap = new HashMap();
+    Map<Integer, List<String>> instantiatorMap = new HashMap<>();
     if (instantiators != null && instantiators.length > 0) {
       for (Instantiator instantiator : instantiators) {
-        ArrayList instantiatorAttributes = new ArrayList();
+        List<String> instantiatorAttributes = new ArrayList<>();
         
instantiatorAttributes.add(instantiator.getClass().toString().substring(6));
         
instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6));
         instantiatorMap.put(instantiator.getId(), instantiatorAttributes);
@@ -61,16 +63,15 @@ public class SocketMessageWriter {
 
     // get all the dataserializers.
     DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers();
-    HashMap<Integer, ArrayList<String>> dsToSupportedClasses =
-        new HashMap<Integer, ArrayList<String>>();
-    HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, 
String>();
+    HashMap<Integer, ArrayList<String>> dsToSupportedClasses = new HashMap<>();
+    HashMap<Integer, String> dataSerializersMap = new HashMap<>();
     if (dataSerializers != null && dataSerializers.length > 0) {
       for (DataSerializer dataSerializer : dataSerializers) {
         dataSerializersMap.put(dataSerializer.getId(),
             dataSerializer.getClass().toString().substring(6));
         if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) {
-          ArrayList<String> supportedClassNames = new ArrayList<String>();
-          for (Class clazz : dataSerializer.getSupportedClasses()) {
+          ArrayList<String> supportedClassNames = new ArrayList<>();
+          for (Class<?> clazz : dataSerializer.getSupportedClasses()) {
             supportedClassNames.add(clazz.getName());
           }
           dsToSupportedClasses.put(dataSerializer.getId(), 
supportedClassNames);
@@ -84,6 +85,7 @@ public class SocketMessageWriter {
     if (clientVersion.isNotOlderThan(KnownVersion.GEODE_1_5_0)) {
       dos.writeInt(CLIENT_PING_TASK_PERIOD);
     }
+
     dos.flush();
   }
 

Reply via email to