This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit 8d44f027ec1cb491d700b19ca3e30c5c474aee90 Author: Jacob Barrett <[email protected]> AuthorDate: Sat Jan 16 08:58:28 2021 -0800 GEODE-6588: Static analyzer cleanup. --- .../client/internal/ClientSideHandshakeImpl.java | 2 +- .../cache/tier/sockets/CacheClientNotifier.java | 110 +++++++++--------- .../cache/tier/sockets/ClientHealthMonitor.java | 31 ++--- .../cache/tier/sockets/MessageDispatcher.java | 127 ++++++++++----------- .../cache/tier/sockets/SocketMessageWriter.java | 16 +-- 5 files changed, 137 insertions(+), 149 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java index eb5fcf9..f0de522 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java @@ -169,7 +169,7 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand // if running in a loner system, use the new port number in the ID to // help differentiate from other clients DistributionManager dm = ((InternalDistributedSystem) system).getDistributionManager(); - InternalDistributedMember idm = dm.getDistributionManagerId(); + final InternalDistributedMember idm = dm.getDistributionManagerId(); synchronized (idm) { if (idm.getMembershipPort() == 0 && dm instanceof LonerDistributionManager) { int port = sock.getLocalPort(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java index a3a67a3..7f4fe13 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets; import static org.apache.geode.cache.Region.SEPARATOR; import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP; import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; import java.io.BufferedOutputStream; import java.io.DataOutputStream; @@ -71,6 +72,7 @@ import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.ClassLoadUtil; import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.cache.CacheClientStatus; @@ -480,8 +482,7 @@ public class CacheClientNotifier { removeClientInitProxy(l_proxy); } } - boolean status = false; - return status; + return false; } /** @@ -620,7 +621,7 @@ public class CacheClientNotifier { } private boolean hasClientProxies() { - return !this._initClientProxies.isEmpty() || !this._clientProxies.isEmpty(); + return !_initClientProxies.isEmpty() || !_clientProxies.isEmpty(); } /** @@ -745,7 +746,8 @@ public class CacheClientNotifier { // Add interestList info. if (filterInfo.getInterestedClientsInv() != null) { - Set<Object> rawIDs = regionProfile.getRealClientIDs(filterInfo.getInterestedClientsInv()); + Set<Object> rawIDs = + uncheckedCast(regionProfile.getRealClientIDs(filterInfo.getInterestedClientsInv())); Set<ClientProxyMembershipID> ids = getProxyIDs(rawIDs); incMessagesNotQueuedOriginatorStat(event, ids); if (!ids.isEmpty()) { @@ -757,7 +759,8 @@ public class CacheClientNotifier { } } if (filterInfo.getInterestedClients() != null) { - Set<Object> rawIDs = regionProfile.getRealClientIDs(filterInfo.getInterestedClients()); + Set<Object> rawIDs = + uncheckedCast(regionProfile.getRealClientIDs(filterInfo.getInterestedClients())); Set<ClientProxyMembershipID> ids = getProxyIDs(rawIDs); incMessagesNotQueuedOriginatorStat(event, ids); if (!ids.isEmpty()) { @@ -801,7 +804,7 @@ public class CacheClientNotifier { FilterInfo filterInfo) { FilterProfile regionProfile = ((InternalRegion) event.getRegion()).getFilterProfile(); if (event.getOperation().isEntry() && filterInfo.getCQs() != null) { - EntryEvent entryEvent = (EntryEvent) event; + EntryEvent<?, ?> entryEvent = (EntryEvent<?, ?>) event; for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) { Long cqID = e.getKey(); String cqName = regionProfile.getRealCqID(cqID); @@ -926,7 +929,7 @@ public class CacheClientNotifier { * processes the given collection of durable and non-durable client identifiers, returning a * collection of non-durable identifiers of clients connected to this VM */ - Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) { + Set<ClientProxyMembershipID> getProxyIDs(Set<?> mixedDurableAndNonDurableIDs) { Set<ClientProxyMembershipID> result = ConcurrentHashMap.newKeySet(); for (Object id : mixedDurableAndNonDurableIDs) { if (id instanceof String) { @@ -961,7 +964,7 @@ public class CacheClientNotifier { CacheDistributionAdvisor advisor = proxy.getHARegionQueue().getRegion().getCacheDistributionAdvisor(); - Set members = advisor.adviseCacheOp(); + Set<InternalDistributedMember> members = advisor.adviseCacheOp(); // Send client denylist message ClientDenylistProcessor.sendDenylistedClient(proxy.getProxyID(), dm, members); @@ -991,7 +994,8 @@ public class CacheClientNotifier { * @param event The event containing the data to be updated * @return a {@code ClientUpdateMessage} */ - private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event) + private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, + CacheEvent<?, ?> event) throws Exception { if (!supportsOperation(operation)) { throw new Exception( @@ -1004,7 +1008,7 @@ public class CacheClientNotifier { boolean isNetLoad = false; Object callbackArgument; byte[] delta = null; - VersionTag versionTag = null; + VersionTag<?> versionTag = null; if (event.getOperation().isEntry()) { EntryEventImpl entryEvent = (EntryEventImpl) event; @@ -1111,7 +1115,8 @@ public class CacheClientNotifier { * @param regionDataPolicy (0==empty) * @since GemFire 6.1 */ - public void updateMapOfEmptyRegions(Map regionsWithEmptyDataPolicy, String regionName, + public void updateMapOfEmptyRegions(Map<String, Integer> regionsWithEmptyDataPolicy, + String regionName, int regionDataPolicy) { if (regionDataPolicy == 0) { if (!regionsWithEmptyDataPolicy.containsKey(regionName)) { @@ -1151,7 +1156,7 @@ public class CacheClientNotifier { * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested * in this {@code Region} and key */ - public void registerClientInterest(String regionName, List keysOfInterest, + public void registerClientInterest(String regionName, List<?> keysOfInterest, ClientProxyMembershipID membershipID, boolean isDurable, boolean sendUpdatesAsInvalidates, boolean manageEmptyRegions, int regionDataPolicy, boolean flushState) throws IOException, RegionDestroyedException { @@ -1184,7 +1189,7 @@ public class CacheClientNotifier { * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested * in this {@code Region} and key */ - public void unregisterClientInterest(String regionName, List keysOfInterest, boolean isClosing, + public void unregisterClientInterest(String regionName, List<?> keysOfInterest, boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) { if (logger.isDebugEnabled()) { logger.debug("CacheClientNotifier: Client {} unregistering interest in: {} -> {}", @@ -1203,7 +1208,7 @@ public class CacheClientNotifier { * @return the {@code CacheClientProxy} associated to the membershipID */ public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID) { - return (CacheClientProxy) _clientProxies.get(membershipID); + return _clientProxies.get(membershipID); } /** @@ -1214,7 +1219,7 @@ public class CacheClientNotifier { boolean proxyInInitMode) { CacheClientProxy proxy = getClientProxy(membershipID); if (proxyInInitMode && proxy == null) { - proxy = (CacheClientProxy) _initClientProxies.get(membershipID); + proxy = _initClientProxies.get(membershipID); } return proxy; } @@ -1286,10 +1291,10 @@ public class CacheClientNotifier { getCache().getCacheServers().size()); } - Iterator it = _clientProxies.values().iterator(); + Iterator<CacheClientProxy> it = _clientProxies.values().iterator(); // Close all the client proxies while (it.hasNext()) { - CacheClientProxy proxy = (CacheClientProxy) it.next(); + CacheClientProxy proxy = it.next(); if (proxy.getAcceptorId() != acceptorId) { continue; } @@ -1299,9 +1304,9 @@ public class CacheClientNotifier { logger.debug("CacheClientNotifier: Closing {}", proxy); } proxy.terminateDispatching(true); - } catch (Exception ignore) { + } catch (Exception e) { if (isDebugEnabled) { - logger.debug("{}: Exception in closing down the CacheClientProxy", this, ignore); + logger.debug("{}: Exception in closing down the CacheClientProxy", this, e); } } } @@ -1355,7 +1360,7 @@ public class CacheClientNotifier { * ClientHealthMonitor.getInstance() might return null. */ if (chm != null) { - chm.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal()); + chm.numberOfClientsWithConflationOff.incrementAndGet(); } } timedOutDurableClientProxies.remove(proxy.getProxyID()); @@ -1379,8 +1384,8 @@ public class CacheClientNotifier { * * @return set of memberIds */ - public Set getActiveClients() { - Set clients = new HashSet(); + public Set<ClientProxyMembershipID> getActiveClients() { + Set<ClientProxyMembershipID> clients = new HashSet<>(); for (CacheClientProxy proxy : getClientProxies()) { if (proxy.hasRegisteredInterested()) { ClientProxyMembershipID proxyID = proxy.getProxyID(); @@ -1395,8 +1400,8 @@ public class CacheClientNotifier { * * @return Map, with CacheClientProxy as a key and CacheClientStatus as a value */ - public Map getAllClients() { - Map clients = new HashMap(); + public Map<ClientProxyMembershipID, CacheClientStatus> getAllClients() { + Map<ClientProxyMembershipID, CacheClientStatus> clients = new HashMap<>(); for (Object o : _clientProxies.values()) { CacheClientProxy proxy = (CacheClientProxy) o; ClientProxyMembershipID proxyID = proxy.getProxyID(); @@ -1448,8 +1453,8 @@ public class CacheClientNotifier { * * @return map with CacheClientProxy as key, and Integer as a value */ - public Map getClientQueueSizes() { - Map/* <ClientProxyMembershipID,Integer> */ queueSizes = new HashMap(); + public Map<ClientProxyMembershipID, Integer> getClientQueueSizes() { + Map<ClientProxyMembershipID, Integer> queueSizes = new HashMap<>(); for (Object o : _clientProxies.values()) { CacheClientProxy proxy = (CacheClientProxy) o; queueSizes.put(proxy.getProxyID(), proxy.getQueueSize()); @@ -1488,7 +1493,7 @@ public class CacheClientNotifier { if (!(proxy.clientConflation == Handshake.CONFLATION_ON)) { ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); if (chm != null) { - chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal()); + chm.numberOfClientsWithConflationOff.decrementAndGet(); } } } @@ -1555,22 +1560,22 @@ public class CacheClientNotifier { * * @param deadProxies The list of {@code CacheClientProxy} instances to close */ - private void closeDeadProxies(List deadProxies, boolean stoppedNormally) { + private void closeDeadProxies(List<CacheClientProxy> deadProxies, boolean stoppedNormally) { final boolean isDebugEnabled = logger.isDebugEnabled(); - for (Object deadProxy : deadProxies) { - CacheClientProxy proxy = (CacheClientProxy) deadProxy; + for (CacheClientProxy deadProxy : deadProxies) { if (isDebugEnabled) { - logger.debug("CacheClientNotifier: Closing dead client: {}", proxy); + logger.debug("CacheClientNotifier: Closing dead client: {}", deadProxy); } // Close the proxy boolean keepProxy = false; try { - keepProxy = proxy.close(false, stoppedNormally); + keepProxy = deadProxy.close(false, stoppedNormally); } catch (CancelException e) { throw e; } catch (Exception e) { - logger.warn("CacheClientNotifier: Caught exception attempting to close client: {}", proxy, + logger.warn("CacheClientNotifier: Caught exception attempting to close client: {}", + deadProxy, e); } @@ -1579,15 +1584,16 @@ public class CacheClientNotifier { if (keepProxy) { logger.info( "CacheClientNotifier: Keeping proxy for durable client named {} for {} seconds {}.", - proxy.getDurableId(), proxy.getDurableTimeout(), proxy); + deadProxy.getDurableId(), deadProxy.getDurableTimeout(), deadProxy); } else { - closeAllClientCqs(proxy); + closeAllClientCqs(deadProxy); if (isDebugEnabled) { - logger.debug("CacheClientNotifier: Not keeping proxy for non-durable client: {}", proxy); + logger.debug("CacheClientNotifier: Not keeping proxy for non-durable client: {}", + deadProxy); } - removeClientProxy(proxy); + removeClientProxy(deadProxy); } - proxy.notifyRemoval(); + deadProxy.notifyRemoval(); } // for } @@ -1624,7 +1630,7 @@ public class CacheClientNotifier { * * @since GemFire 5.8Beta */ - public Set getInterestRegistrationListeners() { + public Set<InterestRegistrationListener> getInterestRegistrationListeners() { return readableInterestRegistrationListeners; } @@ -1740,7 +1746,7 @@ public class CacheClientNotifier { statistics = new CacheClientNotifierStats(factory); try { - logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); + logFrequency = Long.parseLong(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); if (logFrequency <= 0) { logFrequency = DEFAULT_LOG_FREQUENCY; } @@ -1899,18 +1905,17 @@ public class CacheClientNotifier { * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the * CacheClientProxy and then call getProxyID on it. */ - private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _clientProxies = - new ConcurrentHashMap(); + private final ConcurrentMap<ClientProxyMembershipID, CacheClientProxy> _clientProxies = + new ConcurrentHashMap<>(); /** * The map of {@code CacheClientProxy} instances which are getting initialized. Maps * ClientProxyMembershipID to CacheClientProxy. */ - private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _initClientProxies = - new ConcurrentHashMap(); + private final ConcurrentMap<ClientProxyMembershipID, CacheClientProxy> _initClientProxies = + new ConcurrentHashMap<>(); - private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = - new HashSet<>(); + private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = new HashSet<>(); /** * The GemFire {@code InternalCache}. Note that since this is a singleton class you should @@ -1966,13 +1971,14 @@ public class CacheClientNotifier { * The {@code InterestRegistrationListener} instances registered in this VM. This is used * when modifying the set of listeners. */ - private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet(); + private final Set<InterestRegistrationListener> writableInterestRegistrationListeners = + new CopyOnWriteArraySet<>(); /** * The {@code InterestRegistrationListener} instances registered in this VM. This is used to * provide a read-only {@code Set} of listeners. */ - private final Set readableInterestRegistrationListeners = + private final Set<InterestRegistrationListener> readableInterestRegistrationListeners = Collections.unmodifiableSet(writableInterestRegistrationListeners); /** @@ -2032,7 +2038,7 @@ public class CacheClientNotifier { /** * @return the haContainer */ - public Map getHaContainer() { + public Map<?, ?> getHaContainer() { return haContainer; } @@ -2050,7 +2056,7 @@ public class CacheClientNotifier { : overflowAttributes.getOverflowDirectory(), overflowAttributes.isDiskStore()))); } else { - haContainer = new HAContainerMap(new ConcurrentHashMap()); + haContainer = new HAContainerMap(new ConcurrentHashMap<>()); } assert haContainer != null; @@ -2059,7 +2065,7 @@ public class CacheClientNotifier { } } - private final Set denyListedClients = new CopyOnWriteArraySet(); + private final Set<ClientProxyMembershipID> denyListedClients = new CopyOnWriteArraySet<>(); void addToDenylistedClient(ClientProxyMembershipID proxyID) { denyListedClients.add(proxyID); @@ -2069,7 +2075,7 @@ public class CacheClientNotifier { TimeUnit.SECONDS); } - Set getDenylistedClient() { + Set<ClientProxyMembershipID> getDenylistedClient() { return denyListedClients; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java index 70a1897..0d709f8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -47,7 +47,6 @@ import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.internal.cache.tier.ServerSideHandshake; import org.apache.geode.internal.lang.JavaWorkarounds; -import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.logging.internal.executors.LoggingThread; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -66,13 +65,13 @@ public class ClientHealthMonitor { /** * The map of known clients and last time seen. */ - private ConcurrentMap<ClientProxyMembershipID, AtomicLong> clientHeartbeats = + private final ConcurrentMap<ClientProxyMembershipID, AtomicLong> clientHeartbeats = new ConcurrentHashMap<>(); /** * The map of known clients and maximum time between pings. */ - private ConcurrentMap<ClientProxyMembershipID, Integer> clientMaximumTimeBetweenPings = + private final ConcurrentMap<ClientProxyMembershipID, Integer> clientMaximumTimeBetweenPings = new ConcurrentHashMap<>(); /** @@ -84,7 +83,7 @@ public class ClientHealthMonitor { return maximumTimeBetweenPings; } - private volatile int maximumTimeBetweenPings; + private final int maximumTimeBetweenPings; /** * A thread that validates client connections @@ -128,22 +127,18 @@ public class ClientHealthMonitor { new HashMap<>(); /** - * Gives, version-wise, the number of clients connected to the cache servers in this cache, which + * Gives the number of clients connected to the cache servers in this cache, which * are capable of processing received deltas. * - * NOTE: It does not necessarily give the actual number of clients per version connected to the - * cache servers in this cache. - * * @see CacheClientNotifier#addClientProxy(CacheClientProxy) */ - AtomicIntegerArray numOfClientsPerVersion = - new AtomicIntegerArray(KnownVersion.HIGHEST_VERSION + 1); + AtomicInteger numberOfClientsWithConflationOff = new AtomicInteger(0); public long getMonitorInterval() { return monitorInterval; } - private long monitorInterval; + private final long monitorInterval; /** * Factory method to construct or return the singleton <code>ClientHealthMonitor</code> instance. @@ -380,7 +375,7 @@ public class ClientHealthMonitor { * ConnectionProxies may be from same client member or different. If it is null this would * mean to fetch the Connections of all the ConnectionProxy objects. */ - public Map<String, Object[]> getConnectedClients(Set filterProxies) { + public Map<String, Object[]> getConnectedClients(Set<ClientProxyMembershipID> filterProxies) { Map<String, Object[]> map = new HashMap<>(); // KEY=proxyID, VALUE=connectionCount (Integer) synchronized (proxyIdConnections) { for (Map.Entry<ClientProxyMembershipID, ServerConnectionCollection> entry : proxyIdConnections @@ -665,16 +660,8 @@ public class ClientHealthMonitor { return cleanupTable; } - private int getNumberOfClientsAtOrAboveVersion(KnownVersion version) { - int number = 0; - for (int i = version.ordinal(); i < numOfClientsPerVersion.length(); i++) { - number += numOfClientsPerVersion.get(i); - } - return number; - } - public boolean hasDeltaClients() { - return getNumberOfClientsAtOrAboveVersion(KnownVersion.OLDEST) > 0; + return numberOfClientsWithConflationOff.get() > 0; } private int getMaximumTimeBetweenPings(ClientProxyMembershipID proxyID) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java index c1c9cf3..7157e55 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java @@ -14,11 +14,12 @@ */ package org.apache.geode.internal.cache.tier.sockets; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; + import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -123,7 +124,7 @@ public class MessageDispatcher extends LoggingThread { StatisticsClock statisticsClock) throws CacheException { super(name); - this._proxy = proxy; + _proxy = proxy; // Create the event conflator // this._eventConflator = new BridgeEventConflator @@ -138,24 +139,22 @@ public class MessageDispatcher extends LoggingThread { boolean createDurableQueue = proxy.proxyID.isDurable(); boolean canHandleDelta = InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation() - && !(this._proxy.clientConflation == Handshake.CONFLATION_ON); + && !(_proxy.clientConflation == Handshake.CONFLATION_ON); if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) { logger.debug("Creating a {} subscription queue for {}", createDurableQueue ? "durable" : "non-durable", proxy.getProxyID()); } - this._messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(), + _messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(), getCache(), harq, HARegionQueue.BLOCKING_HA_QUEUE, createDurableQueue, proxy._cacheClientNotifier.getHaContainer(), proxy.getProxyID(), - this._proxy.clientConflation, this._proxy.isPrimary(), canHandleDelta, statisticsClock); + _proxy.clientConflation, _proxy.isPrimary(), canHandleDelta, statisticsClock); // Check if interests were registered during HARegion GII. - if (this._proxy.hasRegisteredInterested()) { - this._messageQueue.setHasRegisteredInterest(true); + if (_proxy.hasRegisteredInterested()) { + _messageQueue.setHasRegisteredInterest(true); } - } catch (CancelException e) { + } catch (CancelException | RegionExistsException e) { throw e; - } catch (RegionExistsException ree) { - throw ree; } catch (Exception e) { getCache().getCancelCriterion().checkCancelInProgress(e); throw new CacheException( @@ -167,7 +166,7 @@ public class MessageDispatcher extends LoggingThread { } private CacheClientProxy getProxy() { - return this._proxy; + return _proxy; } private InternalCache getCache() { @@ -190,7 +189,7 @@ public class MessageDispatcher extends LoggingThread { if (logger.isDebugEnabled()) { logger.debug("{}: notified dispatcher to stop", this); } - this._isStopped = true; + _isStopped = true; // this.interrupt(); // don't interrupt here. Let close(boolean) do this. } @@ -221,13 +220,12 @@ public class MessageDispatcher extends LoggingThread { } // Stay alive until the queue is empty or a number of peeks is reached. - List events = null; try { for (int numberOfPeeks = 0; numberOfPeeks < CacheClientProxy.MAXIMUM_SHUTDOWN_PEEKS; ++numberOfPeeks) { boolean interrupted = Thread.interrupted(); try { - events = this._messageQueue.peek(1, -1); + List<?> events = _messageQueue.peek(1, -1); if (events == null || events.size() == 0) { break; } @@ -259,7 +257,7 @@ public class MessageDispatcher extends LoggingThread { * @return whether the dispatcher is stopped */ protected boolean isStopped() { - return this._isStopped; + return _isStopped; } /** @@ -269,7 +267,7 @@ public class MessageDispatcher extends LoggingThread { * @return the size of the queue */ protected int getQueueSize() { - return this._messageQueue == null ? 0 : this._messageQueue.size(); + return _messageQueue == null ? 0 : _messageQueue.size(); } /** @@ -279,8 +277,8 @@ public class MessageDispatcher extends LoggingThread { * @return the size of the queue */ protected int getQueueSizeStat() { - if (this._messageQueue != null) { - HARegionQueueStats stats = this._messageQueue.getStatistics(); + if (_messageQueue != null) { + HARegionQueueStats stats = _messageQueue.getStatistics(); return ((int) (stats.getEventsEnqued() - stats.getEventsRemoved() - stats.getEventsConflated() - stats.getMarkerEventsConflated() - stats.getEventsExpired() - stats.getEventsRemovedByQrm() - stats.getEventsTaken() @@ -291,7 +289,7 @@ public class MessageDispatcher extends LoggingThread { protected void drainClientCqEvents(ClientProxyMembershipID clientId, InternalCqQuery cqToClose) { - this._messageQueue.closeClientCq(clientId, cqToClose); + _messageQueue.closeClientCq(clientId, cqToClose); } @Override @@ -328,7 +326,7 @@ public class MessageDispatcher extends LoggingThread { @VisibleForTesting protected void runDispatcher() { boolean exceptionOccurred = false; - this._isStopped = false; + _isStopped = false; if (logger.isDebugEnabled()) { logger.debug("{}: Beginning to process events", this); @@ -337,7 +335,7 @@ public class MessageDispatcher extends LoggingThread { ClientMessage clientMessage = null; while (!isStopped()) { // SystemFailure.checkFailure(); DM's stopper does this - if (this._proxy._cache.getCancelCriterion().isCancelInProgress()) { + if (_proxy._cache.getCancelCriterion().isCancelInProgress()) { break; } try { @@ -348,12 +346,12 @@ public class MessageDispatcher extends LoggingThread { // reconnecting. synchronized (_pausedLock) { try { - logger.info("available ids = " + this._messageQueue.size() + " , isEmptyAckList =" - + this._messageQueue.isEmptyAckList() + ", peekInitialized = " - + this._messageQueue.isPeekInitialized()); - while (!this._messageQueue.isEmptyAckList() - && this._messageQueue.isPeekInitialized()) { - this._messageQueue.remove(); + logger.info("available ids = " + _messageQueue.size() + " , isEmptyAckList =" + + _messageQueue.isEmptyAckList() + ", peekInitialized = " + + _messageQueue.isPeekInitialized()); + while (!_messageQueue.isEmptyAckList() + && _messageQueue.isPeekInitialized()) { + _messageQueue.remove(); } } catch (InterruptedException ex) { logger.warn("{}: sleep interrupted.", this); @@ -362,11 +360,11 @@ public class MessageDispatcher extends LoggingThread { waitForResumption(); } try { - clientMessage = (ClientMessage) this._messageQueue.peek(); + clientMessage = (ClientMessage) _messageQueue.peek(); } catch (RegionDestroyedException skipped) { break; } - getStatistics().setQueueSize(this._messageQueue.size()); + getStatistics().setQueueSize(_messageQueue.size()); if (isStopped()) { break; } @@ -377,13 +375,13 @@ public class MessageDispatcher extends LoggingThread { boolean isDispatched = dispatchMessage(clientMessage); getStatistics().endMessage(start); if (isDispatched) { - this._messageQueue.remove(); + _messageQueue.remove(); if (clientMessage instanceof ClientMarkerMessageImpl) { getProxy().setMarkerEnqueued(false); } } } else { - this._messageQueue.remove(); + _messageQueue.remove(); } clientMessage = null; } catch (MessageTooLargeException e) { @@ -391,7 +389,7 @@ public class MessageDispatcher extends LoggingThread { } catch (IOException e) { // Added the synchronization below to ensure that exception handling // does not occur while stopping the dispatcher and vice versa. - synchronized (this._stopDispatchingLock) { + synchronized (_stopDispatchingLock) { // An IOException occurred while sending a message to the // client. If the processor is not already stopped, assume // the client is dead and stop processing. @@ -467,13 +465,13 @@ public class MessageDispatcher extends LoggingThread { } // Processing gets here if isStopped=true. What is this code below doing? - List list = null; if (!exceptionOccurred) { + List<ClientMessage> list = null; try { // Clear the interrupt status if any, Thread.interrupted(); - int size = this._messageQueue.size(); - list = this._messageQueue.peek(size); + int size = _messageQueue.size(); + list = uncheckedCast(_messageQueue.peek(size)); if (logger.isDebugEnabled()) { logger.debug( "{}: After flagging the dispatcher to stop , the residual List of messages to be dispatched={} size={}", @@ -481,31 +479,29 @@ public class MessageDispatcher extends LoggingThread { } if (list.size() > 0) { long start = getStatistics().startTime(); - Iterator itr = list.iterator(); - while (itr.hasNext()) { - dispatchMessage((ClientMessage) itr.next()); + for (final ClientMessage o : list) { + dispatchMessage(o); getStatistics().endMessage(start); // @todo asif: shouldn't we call itr.remove() since the current msg // has been sent? That way list will be more accurate // if we have an exception. } - this._messageQueue.remove(); + _messageQueue.remove(); } } catch (CancelException e) { if (logger.isDebugEnabled()) { logger.debug("CacheClientNotifier stopped due to cancellation"); } - } catch (Exception ignore) { + } catch (Exception e) { // if (logger.isInfoEnabled()) { String extraMsg = null; - if ("Broken pipe".equals(ignore.getMessage())) { + if ("Broken pipe".equals(e.getMessage())) { extraMsg = "Problem caused by broken pipe on socket."; - } else if (ignore instanceof RegionDestroyedException) { - extraMsg = - "Problem caused by message queue being closed."; + } else if (e instanceof RegionDestroyedException) { + extraMsg = "Problem caused by message queue being closed."; } - final Object[] msgArgs = new Object[] {((!isStopped()) ? this.toString() + ": " : ""), + final Object[] msgArgs = new Object[] {((!isStopped()) ? toString() + ": " : ""), ((list == null) ? 0 : list.size())}; if (extraMsg != null) { // Dont print exception details, but add on extraMsg @@ -519,7 +515,7 @@ public class MessageDispatcher extends LoggingThread { logger.info(String.format( "%s Possibility of not being able to send some or all of the messages to clients. Total messages currently present in the list %s.", msgArgs), - ignore); + e); } } @@ -547,7 +543,7 @@ public class MessageDispatcher extends LoggingThread { } } } else { - this._isStopped = true; + _isStopped = true; } // Stop the ServerConnections. This will force the client to @@ -579,11 +575,8 @@ public class MessageDispatcher extends LoggingThread { if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) { logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "Dispatching {}", clientMessage); } - Message message = null; - - // byte[] latestValue = - // this._eventConflator.getLatestValue(clientMessage); + final Message message; if (clientMessage instanceof ClientUpdateMessage) { byte[] latestValue = (byte[]) ((ClientUpdateMessage) clientMessage).getValue(); if (logger.isTraceEnabled()) { @@ -608,7 +601,7 @@ public class MessageDispatcher extends LoggingThread { message = clientMessage.getMessage(getProxy(), true /* notify */); } - if (!this._proxy.isPaused()) { + if (!_proxy.isPaused()) { sendMessage(message); if (logger.isTraceEnabled()) { @@ -621,7 +614,7 @@ public class MessageDispatcher extends LoggingThread { } } if (isDispatched) { - this._messageQueue.getStatistics().incEventsDispatched(); + _messageQueue.getStatistics().incEventsDispatched(); } return isDispatched; } @@ -630,13 +623,13 @@ public class MessageDispatcher extends LoggingThread { if (message == null) { return; } - this.socketWriteLock.lock(); + socketWriteLock.lock(); try { message.setComms(getSocket(), getCommBuffer(), getStatistics()); message.send(); getProxy().resetPingCounter(); } finally { - this.socketWriteLock.unlock(); + socketWriteLock.unlock(); } if (logger.isTraceEnabled()) { logger.trace("{}: Sent {}", this, message); @@ -650,9 +643,9 @@ public class MessageDispatcher extends LoggingThread { */ protected void enqueueMessage(Conflatable clientMessage) { try { - this._messageQueue.put(clientMessage); - if (this._proxy.isPaused() && this._proxy.isDurable()) { - this._proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount(); + _messageQueue.put(clientMessage); + if (_proxy.isPaused() && _proxy.isDurable()) { + _proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount(); if (logger.isDebugEnabled()) { logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage); } @@ -661,7 +654,7 @@ public class MessageDispatcher extends LoggingThread { throw e; } catch (Exception e) { if (!isStopped()) { - this._proxy._statistics.incMessagesFailedQueued(); + _proxy._statistics.incMessagesFailedQueued(); logger.fatal( String.format("%s: Exception occurred while attempting to add message to queue", this), @@ -677,7 +670,7 @@ public class MessageDispatcher extends LoggingThread { logger.debug("{}: Queueing marker message. <{}>. The queue contains {} entries.", this, message, getQueueSize()); } - this._messageQueue.put(message); + _messageQueue.put(message); if (logger.isDebugEnabled()) { logger.debug("{}: Queued marker message. The queue contains {} entries.", this, getQueueSize()); @@ -711,7 +704,7 @@ public class MessageDispatcher extends LoggingThread { logger.warn("Message too large to send to client: {}, {}", clientMessage, e.getMessage()); } catch (IOException e) { - synchronized (this._stopDispatchingLock) { + synchronized (_stopDispatchingLock) { // Pause or unregister proxy if (!isStopped() && !getProxy().isPaused()) { logger.fatal(String.format("%s : An unexpected Exception occurred", this), @@ -727,13 +720,13 @@ public class MessageDispatcher extends LoggingThread { } protected void waitForResumption() throws InterruptedException { - synchronized (this._pausedLock) { + synchronized (_pausedLock) { logger.info("{} : Pausing processing", this); if (!getProxy().isPaused()) { return; } while (getProxy().isPaused()) { - this._pausedLock.wait(); + _pausedLock.wait(); } // Fix for #48571 _messageQueue.clearPeekedIDs(); @@ -744,7 +737,7 @@ public class MessageDispatcher extends LoggingThread { logger.info("{} : Resuming processing", this); // Notify thread to resume - this._pausedLock.notifyAll(); + _pausedLock.notifyAll(); } protected Object deserialize(byte[] serializedBytes) { @@ -759,13 +752,13 @@ public class MessageDispatcher extends LoggingThread { } protected void initializeTransients() { - while (!this._messageQueue.isEmptyAckList() && this._messageQueue.isPeekInitialized()) { + while (!_messageQueue.isEmptyAckList() && _messageQueue.isPeekInitialized()) { try { - this._messageQueue.remove(); + _messageQueue.remove(); } catch (InterruptedException e) { e.printStackTrace(); } } - this._messageQueue.initializeTransients(); + _messageQueue.initializeTransients(); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java index 89bdc8d..0a4ef53 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java @@ -19,6 +19,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.geode.DataSerializer; import org.apache.geode.Instantiator; @@ -48,10 +50,10 @@ public class SocketMessageWriter { // get all the instantiators. Instantiator[] instantiators = InternalInstantiator.getInstantiators(); - HashMap instantiatorMap = new HashMap(); + Map<Integer, List<String>> instantiatorMap = new HashMap<>(); if (instantiators != null && instantiators.length > 0) { for (Instantiator instantiator : instantiators) { - ArrayList instantiatorAttributes = new ArrayList(); + List<String> instantiatorAttributes = new ArrayList<>(); instantiatorAttributes.add(instantiator.getClass().toString().substring(6)); instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6)); instantiatorMap.put(instantiator.getId(), instantiatorAttributes); @@ -61,16 +63,15 @@ public class SocketMessageWriter { // get all the dataserializers. DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers(); - HashMap<Integer, ArrayList<String>> dsToSupportedClasses = - new HashMap<Integer, ArrayList<String>>(); - HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>(); + HashMap<Integer, ArrayList<String>> dsToSupportedClasses = new HashMap<>(); + HashMap<Integer, String> dataSerializersMap = new HashMap<>(); if (dataSerializers != null && dataSerializers.length > 0) { for (DataSerializer dataSerializer : dataSerializers) { dataSerializersMap.put(dataSerializer.getId(), dataSerializer.getClass().toString().substring(6)); if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) { - ArrayList<String> supportedClassNames = new ArrayList<String>(); - for (Class clazz : dataSerializer.getSupportedClasses()) { + ArrayList<String> supportedClassNames = new ArrayList<>(); + for (Class<?> clazz : dataSerializer.getSupportedClasses()) { supportedClassNames.add(clazz.getName()); } dsToSupportedClasses.put(dataSerializer.getId(), supportedClassNames); @@ -84,6 +85,7 @@ public class SocketMessageWriter { if (clientVersion.isNotOlderThan(KnownVersion.GEODE_1_5_0)) { dos.writeInt(CLIENT_PING_TASK_PERIOD); } + dos.flush(); }
