GEODE-1761 Clients don't fail back when servers are bounced Servers will send a refresh hint to clients if they detect that a request had to be send to a different server who owned the primary bucket affected by the operation. Clients should always refresh when this happens unless they have connection-pool size constraints that force them to use non-optimal servers.
Client-side operation classes have been modified to initiate the refresh. I've added code in the meta-data service class to avoid performing multiple concurrent refreshes on the same region. On the server-side I've cleaned up some of the network-hop detection code to stop using hard-coded integers and to consolidate some of the code that resets the ThreadLocals being used to record network- hops detected. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cea5535c Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cea5535c Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cea5535c Branch: refs/heads/feature/GEODE-420 Commit: cea5535cf2e0de1ee1ef8ea902ea44a2f48b4fd5 Parents: 7962080 Author: Bruce Schuchardt <[email protected]> Authored: Fri Aug 19 11:37:39 2016 -0700 Committer: Bruce Schuchardt <[email protected]> Committed: Fri Aug 19 11:39:01 2016 -0700 ---------------------------------------------------------------------- .../cache/client/internal/AbstractOp.java | 11 +++ .../client/internal/ClientMetadataService.java | 40 +++++++---- .../cache/client/internal/DestroyOp.java | 22 +++--- .../client/internal/GetClientPRMetaDataOp.java | 24 +++---- .../gemfire/cache/client/internal/GetOp.java | 18 ++--- .../gemfire/cache/client/internal/PutAllOp.java | 7 +- .../gemfire/cache/client/internal/PutOp.java | 46 +++--------- .../cache/client/internal/RemoveAllOp.java | 7 +- .../internal/SingleHopOperationCallable.java | 1 + .../gemfire/internal/cache/CachePerfStats.java | 2 +- .../internal/cache/PartitionedRegion.java | 73 ++++++++++++++------ .../cache/tier/sockets/BaseCommand.java | 4 +- .../internal/cache/tier/sockets/Message.java | 2 - .../cache/tier/sockets/command/Destroy.java | 8 +-- .../cache/tier/sockets/command/Destroy65.java | 7 +- .../cache/tier/sockets/command/Destroy70.java | 2 +- .../cache/tier/sockets/command/Get70.java | 9 ++- .../cache/tier/sockets/command/Invalidate.java | 8 +-- .../tier/sockets/command/Invalidate70.java | 2 +- .../cache/tier/sockets/command/Put61.java | 7 +- .../cache/tier/sockets/command/Put65.java | 13 ++-- .../cache/tier/sockets/command/Put70.java | 2 +- .../cache/tier/sockets/command/PutAll.java | 7 +- .../cache/tier/sockets/command/PutAll70.java | 9 ++- .../cache/tier/sockets/command/PutAll80.java | 9 ++- .../cache/tier/sockets/command/RemoveAll.java | 9 ++- .../cache/tier/sockets/command/Request.java | 7 +- .../PartitionedRegionSingleHopDUnitTest.java | 5 +- .../internal/cache/SingleHopStatsDUnitTest.java | 17 +++-- 29 files changed, 198 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java index 1eb0dbd..f93620e 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java @@ -50,6 +50,8 @@ public abstract class AbstractOp implements Op { private final Message msg; + private boolean allowDuplicateMetadataRefresh; + protected AbstractOp(int msgType, int msgParts) { this.msg = new Message(msgParts, Version.CURRENT); getMessage().setMessageType(msgType); @@ -301,6 +303,15 @@ public abstract class AbstractOp implements Op { } } } + + public boolean isAllowDuplicateMetadataRefresh() { + return allowDuplicateMetadataRefresh; + } + + public void setAllowDuplicateMetadataRefresh(final boolean allowDuplicateMetadataRefresh) { + this.allowDuplicateMetadataRefresh = allowDuplicateMetadataRefresh; + } + /** * Used by subclasses who get chunked responses. */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java index 6e255c4..4005b78 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java @@ -20,10 +20,7 @@ import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.ServerLocation; -import com.gemstone.gemfire.internal.cache.BucketServerLocation66; -import com.gemstone.gemfire.internal.cache.EntryOperationImpl; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper; +import com.gemstone.gemfire.internal.cache.*; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import org.apache.logging.log4j.Logger; @@ -62,7 +59,9 @@ public final class ClientMetadataService { private boolean isMetadataRefreshed_TEST_ONLY = false; - private int fetchTaskCount = 0; + private int refreshTaskCount = 0; + + private Set<String> regionsBeingRefreshed = new HashSet<>(); private final Object fetchTaskCountLock = new Object(); @@ -521,7 +520,7 @@ public final class ClientMetadataService { } else { synchronized (fetchTaskCountLock){ - fetchTaskCount++; + refreshTaskCount++; } Runnable fetchTask = new Runnable() { @SuppressWarnings("synthetic-access") @@ -541,7 +540,7 @@ public final class ClientMetadataService { } finally { synchronized (fetchTaskCountLock){ - fetchTaskCount--; + refreshTaskCount--; } } } @@ -610,14 +609,19 @@ public final class ClientMetadataService { ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(region.getFullPath()); if(advisor!= null && advisor.getServerGroup().length()!= 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP){ if (logger.isDebugEnabled()) { - logger.debug("Scheduling metadata refresh : {}", nwHopType); + logger.debug("Scheduling metadata refresh: {} region: {}", nwHopType, region.getName()); } - if(nwHopType == (byte)2){ + if( nwHopType == PartitionedRegion.NETWORK_HOP_TO_DIFFERENT_GROUP){ + return; + } + } + synchronized (fetchTaskCountLock) { + if (regionsBeingRefreshed.contains(region.getFullPath())) { return; } } - region.getCachePerfStats().incNonSingleHopsCount(); if (isRecursive) { + region.getCachePerfStats().incNonSingleHopsCount(); try { getClientPRMetadata(region); } catch (VirtualMachineError e) { @@ -630,8 +634,13 @@ public final class ClientMetadataService { } } } else { - synchronized (fetchTaskCountLock){ - fetchTaskCount++; + synchronized (fetchTaskCountLock) { + if (regionsBeingRefreshed.contains(region.getFullPath())) { + return; + } + region.getCachePerfStats().incNonSingleHopsCount(); + regionsBeingRefreshed.add(region.getFullPath()); + refreshTaskCount++; } Runnable fetchTask = new Runnable() { @SuppressWarnings("synthetic-access") @@ -649,7 +658,8 @@ public final class ClientMetadataService { } finally { synchronized (fetchTaskCountLock){ - fetchTaskCount--; + regionsBeingRefreshed.remove(region.getFullPath()); + refreshTaskCount--; } } } @@ -849,9 +859,9 @@ public final class ClientMetadataService { this.isMetadataStable = isMetadataStable; } - public int getFetchTaskCount() { + public int getRefreshTaskCount() { synchronized(fetchTaskCountLock) { - return fetchTaskCount; + return refreshTaskCount; } } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java index e8ce1a7..ab9b2d1 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java @@ -64,7 +64,7 @@ public class DestroyOp { if (logger.isDebugEnabled()) { logger.debug("Preparing DestroyOp for {} operation={}", key, operation); } - AbstractOp op = new DestroyOpImpl(region, key, expectedOldValue, + DestroyOpImpl op = new DestroyOpImpl(region, key, expectedOldValue, operation, event, callbackArg, prSingleHopEnabled); if (prSingleHopEnabled) { ClientMetadataService cms = region.getCache() @@ -77,6 +77,7 @@ public class DestroyOp { boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl .getConnectionCount() >= poolImpl.getMaxConnections()) ? true : false); + op.setAllowDuplicateMetadataRefresh(! onlyUseExistingCnx); return pool.executeOn(server, op, true, onlyUseExistingCnx); } catch (AllConnectionsInUseException e) { @@ -140,10 +141,10 @@ public class DestroyOp { private boolean prSingleHopEnabled = false; - private Object callbackArg; - private EntryEventImpl event; + private Object callbackArg; + /** * @throws com.gemstone.gemfire.SerializationException if serialization fails */ @@ -178,6 +179,7 @@ public class DestroyOp { super(MessageType.DESTROY, callbackArg != null ? 6 : 5); this.key = key; this.event = event; + this.callbackArg = callbackArg; getMessage().addStringPart(region); getMessage().addStringOrObjPart(key); getMessage().addObjPart(expectedOldValue); @@ -215,7 +217,6 @@ public class DestroyOp { } } if (prSingleHopEnabled) { - byte version = 0 ; // if (log.fineEnabled()) { // log.fine("reading prSingleHop part #" + (partIdx+1)); // } @@ -224,18 +225,17 @@ public class DestroyOp { if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) { if (this.region != null) { - ClientMetadataService cms = null; try { - cms = region.getCache().getClientMetadataService(); - version = cms.getMetaDataVersion(region, Operation.UPDATE, - key, null, callbackArg); + ClientMetadataService cms = region.getCache().getClientMetadataService(); + int myVersion = cms.getMetaDataVersion(region, Operation.UPDATE, + key, null, callbackArg); + if (myVersion != bytesReceived[0] || isAllowDuplicateMetadataRefresh()) { + cms.scheduleGetPRMetaData(region, false, bytesReceived[1]); + } } catch (CacheClosedException e) { return null; } - if (bytesReceived[0] != version) { - cms.scheduleGetPRMetaData(region, false,bytesReceived[1]); - } } } } else { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java index 9a467f7..240aabb 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java @@ -104,22 +104,22 @@ public class GetClientPRMetaDataOp { for (int i = 0; i < numParts; i++) { Object result = msg.getPart(i).getObject(); List<BucketServerLocation66> locations = (List<BucketServerLocation66>)result; - if (!locations.isEmpty()) { - int bucketId = locations.get(0).getBucketId(); - if (isDebugEnabled) { - logger.debug("GetClientPRMetaDataOpImpl#processResponse: for bucketId : {} locations are {}", bucketId, locations); - } - advisor.updateBucketServerLocations(bucketId, locations, cms); - - Set<ClientPartitionAdvisor> cpas = cms + if (!locations.isEmpty()) { + int bucketId = locations.get(0).getBucketId(); + if (isDebugEnabled) { + logger.debug("GetClientPRMetaDataOpImpl#processResponse: for bucketId : {} locations are {}", bucketId, locations); + } + advisor.updateBucketServerLocations(bucketId, locations, cms); + + Set<ClientPartitionAdvisor> cpas = cms .getColocatedClientPartitionAdvisor(regionFullPath); - if (cpas != null && !cpas.isEmpty()) { - for (ClientPartitionAdvisor colCPA : cpas) { - colCPA.updateBucketServerLocations(bucketId, locations, cms); + if (cpas != null && !cpas.isEmpty()) { + for (ClientPartitionAdvisor colCPA : cpas) { + colCPA.updateBucketServerLocations(bucketId, locations, cms); + } } } } - } if (isDebugEnabled) { logger.debug("GetClientPRMetaDataOpImpl#processResponse: received ClientPRMetadata from server successfully."); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java index 6864306..00f81fd 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java @@ -62,7 +62,7 @@ public class GetOp { Object key, Object callbackArg, boolean prSingleHopEnabled, EntryEventImpl clientEvent) { ClientMetadataService cms = ((GemFireCacheImpl)region.getCache()) .getClientMetadataService(); - AbstractOp op = new GetOpImpl(region, key, callbackArg, + GetOpImpl op = new GetOpImpl(region, key, callbackArg, prSingleHopEnabled, clientEvent); if (logger.isDebugEnabled()) { @@ -77,6 +77,7 @@ public class GetOp { boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl .getConnectionCount() >= poolImpl.getMaxConnections()) ? true : false); + op.setAllowDuplicateMetadataRefresh(! onlyUseExistingCnx); return pool.executeOn(new ServerLocation(server.getHostName(), server.getPort()), op, true, onlyUseExistingCnx); } @@ -113,7 +114,7 @@ public class GetOp { private Object callbackArg; private EntryEventImpl clientEvent; - + public String toString() { return "GetOpImpl(key="+key+")"; } @@ -182,18 +183,17 @@ public class GetOp { byte[] bytesReceived = part.getSerializedForm(); if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) { - ClientMetadataService cms; try { - cms = region.getCache().getClientMetadataService(); - version = cms.getMetaDataVersion(region, Operation.UPDATE, key, - null, callbackArg); + ClientMetadataService cms = region.getCache().getClientMetadataService(); + int myVersion = cms.getMetaDataVersion(region, Operation.UPDATE, + key, null, callbackArg); + if (myVersion != bytesReceived[0] || isAllowDuplicateMetadataRefresh()) { + cms.scheduleGetPRMetaData(region, false, bytesReceived[1]); + } } catch (CacheClosedException e) { return null; } - if (bytesReceived[0] != version) { - cms.scheduleGetPRMetaData(region, false,bytesReceived[1]); - } } } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java index 1610456..112d533 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java @@ -358,12 +358,11 @@ public class PutAllOp { } else if (o instanceof byte[]) { if (prSingleHopEnabled) { byte[] bytesReceived = part.getSerializedForm(); - if (/*bytesReceived.length==1 &&*/ bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop + if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop if (region != null) { - ClientMetadataService cms; try { - cms = region.getCache().getClientMetadataService(); - cms.scheduleGetPRMetaData(region, false,bytesReceived[1]); + ClientMetadataService cms = region.getCache().getClientMetadataService(); + cms.scheduleGetPRMetaData(region, false, bytesReceived[1]); } catch (CacheClosedException e) { } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java index 072ec4e..35760b5 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java @@ -69,9 +69,8 @@ public class PutOp { Operation operation, boolean requireOldValue, Object expectedOldValue, Object callbackArg, - boolean prSingleHopEnabled) - { - AbstractOp op = new PutOpImpl(region, key, value, deltaBytes, event, + boolean prSingleHopEnabled) { + PutOpImpl op = new PutOpImpl(region, key, value, deltaBytes, event, operation, requireOldValue, expectedOldValue, callbackArg, false/*donot send full obj; send delta*/, prSingleHopEnabled); @@ -86,6 +85,7 @@ public class PutOp { boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl .getConnectionCount() >= poolImpl.getMaxConnections()) ? true : false); + op.setAllowDuplicateMetadataRefresh(! onlyUseExistingCnx); return pool.executeOn(new ServerLocation(server.getHostName(), server .getPort()), op, true, onlyUseExistingCnx); } @@ -106,6 +106,7 @@ public class PutOp { Object key, Object value, byte[] deltaBytes, EntryEventImpl event, Operation operation, boolean requireOldValue, Object expectedOldValue, Object callbackArg, boolean prSingleHopEnabled, boolean isMetaRegionPutOp) { + AbstractOp op = new PutOpImpl(regionName, key, value, deltaBytes, event, operation, requireOldValue, expectedOldValue, callbackArg, @@ -135,8 +136,7 @@ public class PutOp { Object value, EntryEventImpl event, Object callbackArg, - boolean prSingleHopEnabled) - { + boolean prSingleHopEnabled) { AbstractOp op = new PutOpImpl(regionName, key, value, null, event, Operation.CREATE, false, null, callbackArg, false /*donot send full Obj; send delta*/, prSingleHopEnabled); @@ -181,6 +181,7 @@ public class PutOp { private Object expectedOldValue; + public PutOpImpl(String regionName , Object key, Object value, byte[] deltaBytes, EntryEventImpl event, Operation op, boolean requireOldValue, @@ -311,30 +312,6 @@ public class PutOp { @Override protected Object processResponse(Message msg) throws Exception { throw new UnsupportedOperationException("processResponse should not be invoked in PutOp. Use processResponse(Message, Connection)"); -// processAck(msg, "put"); -// if (prSingleHopEnabled) { -// byte version = 0; -// Part part = msg.getPart(0); -// byte[] bytesReceived = part.getSerializedForm(); -// if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION -// && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) { // nw hop -// if (this.region != null) { -// ClientMetadataService cms; -// try { -// cms = region.getCache().getClientMetadataService(); -// version = cms.getMetaDataVersion(region, Operation.UPDATE, -// key, value, callbackArg); -// } -// catch (CacheClosedException e) { -// return null; -// } -// if (bytesReceived[0] != version) { -// cms.scheduleGetPRMetaData(region, false,bytesReceived[1]); -// } -// } -// } -// } -// return null; } /* @@ -353,18 +330,17 @@ public class PutOp { protected Object processResponse(Message msg, Connection con) throws Exception { processAck(msg, "put", con); - byte version = 0 ; + if (prSingleHopEnabled) { Part part = msg.getPart(0); byte[] bytesReceived = part.getSerializedForm(); if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) { if (this.region != null) { - ClientMetadataService cms; - cms = region.getCache().getClientMetadataService(); - version = cms.getMetaDataVersion(region, Operation.UPDATE, - key, value, callbackArg); - if (bytesReceived[0] != version) { + ClientMetadataService cms = region.getCache().getClientMetadataService(); + byte myVersion = cms.getMetaDataVersion(region, Operation.UPDATE, + key, value, callbackArg); + if (myVersion != bytesReceived[0] || isAllowDuplicateMetadataRefresh()) { cms.scheduleGetPRMetaData(region, false, bytesReceived[1]); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java index 1ab1ed3..7e62d00 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java @@ -313,12 +313,11 @@ public class RemoveAllOp { } else if (o instanceof byte[]) { if (prSingleHopEnabled) { byte[] bytesReceived = part.getSerializedForm(); - if (/*bytesReceived.length==1 &&*/ bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop + if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { if (region != null) { - ClientMetadataService cms; try { - cms = region.getCache().getClientMetadataService(); - cms.scheduleGetPRMetaData(region, false,bytesReceived[1]); + ClientMetadataService cms = region.getCache().getClientMetadataService(); + cms.scheduleGetPRMetaData(region, false, bytesReceived[1]); } catch (CacheClosedException e) { } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java index 6047a50..a42ba8d 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java @@ -49,6 +49,7 @@ public class SingleHopOperationCallable implements Callable { Object result = null; boolean onlyUseExistingCnx = ((pool.getMaxConnections() != -1 && pool .getConnectionCount() >= pool.getMaxConnections()) ? true : false); + op.setAllowDuplicateMetadataRefresh(! onlyUseExistingCnx); try { UserAttributes.userAttributes.set(securityAttributes); result = this.pool.executeOn(server, op, true, onlyUseExistingCnx); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java index 532bafa..7fa2183 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java @@ -234,7 +234,7 @@ public class CachePerfStats { final String reliableRegionsMissingNoAccessDesc = "Current number of regions configured for reliablity that are missing required roles with No access"; final String clearsDesc = "The total number of times a clear has been done on this cache."; final String nonSingleHopsDesc = "Total number of times client request observed more than one hop during operation."; - final String metaDataRefreshCountDesc = "Total number of times the meta data is refreshed due to hopping obsevred."; + final String metaDataRefreshCountDesc = "Total number of times the meta data is refreshed due to hopping observed."; final String conflatedEventsDesc = "Number of events not delivered due to conflation. Typically this means that the event arrived after a later event was already applied to the cache."; final String tombstoneCountDesc = "Number of destroyed entries that are retained for concurrent modification detection"; final String tombstoneGCCountDesc = "Number of garbage-collections performed on destroyed entries"; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java index 485b94d..df9ceba 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@ -24,6 +24,7 @@ import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.cache.TimeoutException; import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; +import com.gemstone.gemfire.cache.client.internal.*; import com.gemstone.gemfire.cache.execute.*; import com.gemstone.gemfire.cache.partition.PartitionListener; import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException; @@ -131,6 +132,25 @@ public class PartitionedRegion extends LocalRegion implements DistributionConfig.GEMFIRE_PREFIX + "PartitionedRegionRandomSeed", NanoTimer.getTime()).longValue()); private static final AtomicInteger SERIAL_NUMBER_GENERATOR = new AtomicInteger(); + + /** + * getNetworkHopType byte indicating this was the bucket owner for + * the last operation + */ + public static final int NETWORK_HOP_NONE = 0; + + /** + * getNetworkHopType byte indicating this was not the bucket owner and + * a message had to be sent to a primary in the same server group + */ + public static final int NETWORK_HOP_TO_SAME_GROUP = 1; + + /** + * getNetworkHopType byte indicating this was not the bucket owner and + * a message had to be sent to a primary in a different server group + */ + public static final int NETWORK_HOP_TO_DIFFERENT_GROUP = 2; + private final DiskRegionStats diskRegionStats; /** @@ -325,34 +345,47 @@ public class PartitionedRegion extends LocalRegion implements * Byte 0 = no NWHOP Byte 1 = NWHOP to servers in same server-grp Byte 2 = * NWHOP tp servers in other server-grp */ - private final ThreadLocal<Byte> isNetworkHop = new ThreadLocal<Byte>() { + private final ThreadLocal<Byte> networkHopType = new ThreadLocal<Byte>() { @Override protected Byte initialValue() { - return Byte.valueOf((byte)0); + return Byte.valueOf((byte)NETWORK_HOP_NONE); } }; - public void setIsNetworkHop(Byte value) { - this.isNetworkHop.set(value); + public void clearNetworkHopData() { + this.networkHopType.remove(); + this.metadataVersion.remove(); + } + + private void setNetworkHopType(Byte value) { + this.networkHopType.set(value); } - public Byte isNetworkHop() { - return this.isNetworkHop.get(); + /** + * <p> + * If the last operation in the current thread required a one-hop to + * another server who held the primary bucket for the operation then + * this will return something other than NETWORK_HOP_NONE. + * </p> + * see NETWORK_HOP_NONE, NETWORK_HOP_TO_SAME_GROUP and NETWORK_HOP_TO_DIFFERENT_GROUP + */ + public byte getNetworkHopType() { + return this.networkHopType.get().byteValue(); } private final ThreadLocal<Byte> metadataVersion = new ThreadLocal<Byte>() { @Override protected Byte initialValue() { - return 0; + return ClientMetadataService.INITIAL_VERSION; } }; - public void setMetadataVersion(Byte value) { + private void setMetadataVersion(Byte value) { this.metadataVersion.set(value); } - public Byte getMetadataVersion() { - return this.metadataVersion.get(); + public byte getMetadataVersion() { + return this.metadataVersion.get().byteValue(); } @@ -1464,7 +1497,7 @@ public class PartitionedRegion extends LocalRegion implements String name = Thread.currentThread().getName(); if (name.startsWith("ServerConnection") && !getMyId().equals(targetNode)) { - setNetworkHop(bucketIdInt, (InternalDistributedMember)targetNode); + setNetworkHopType(bucketIdInt, (InternalDistributedMember)targetNode); } } @@ -1929,7 +1962,7 @@ public class PartitionedRegion extends LocalRegion implements } if (event.isBridgeEvent() && bucketStorageAssigned) { - setNetworkHop(bucketId, targetNode); + setNetworkHopType(bucketId, targetNode); } if (putAllOp_save == null) { result = putInBucket(targetNode, @@ -4012,7 +4045,7 @@ public class PartitionedRegion extends LocalRegion implements String name = Thread.currentThread().getName(); if (name.startsWith("ServerConnection") && !getMyId().equals(retryNode)) { - setNetworkHop(bucketId, (InternalDistributedMember)retryNode); + setNetworkHopType(bucketId, (InternalDistributedMember)retryNode); } } return obj; @@ -5468,7 +5501,7 @@ public class PartitionedRegion extends LocalRegion implements } else { if (event.isBridgeEvent()) { - setNetworkHop(bucketId, currentTarget); + setNetworkHopType(bucketId, currentTarget); } destroyRemotely(currentTarget, bucketId, @@ -5557,8 +5590,8 @@ public class PartitionedRegion extends LocalRegion implements * @param targetNode */ - private void setNetworkHop(final Integer bucketId, - final InternalDistributedMember targetNode) { + private void setNetworkHopType(final Integer bucketId, + final InternalDistributedMember targetNode) { if (this.isDataStore() && !getMyId().equals(targetNode)) { Set<ServerBucketProfile> profiles = this.getRegionAdvisor() @@ -5569,15 +5602,15 @@ public class PartitionedRegion extends LocalRegion implements if (profile.getDistributedMember().equals(targetNode)) { if (isProfileFromSameGroup(profile)) { - if (this.isNetworkHop() != 1 && logger.isDebugEnabled()) { + if (this.getNetworkHopType() != NETWORK_HOP_TO_SAME_GROUP && logger.isDebugEnabled()) { logger.debug("one-hop: cache op meta data staleness observed. Message is in same server group (byte 1)"); } - this.setIsNetworkHop((byte)1); + this.setNetworkHopType((byte)NETWORK_HOP_TO_SAME_GROUP); } else { - if (this.isNetworkHop() != 2 && logger.isDebugEnabled()) { + if (this.getNetworkHopType() != NETWORK_HOP_TO_DIFFERENT_GROUP && logger.isDebugEnabled()) { logger.debug("one-hop: cache op meta data staleness observed. Message is to different server group (byte 2)"); } - this.setIsNetworkHop((byte)2); + this.setNetworkHopType((byte)NETWORK_HOP_TO_DIFFERENT_GROUP); } this.setMetadataVersion((byte)profile.getVersion()); break; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java index f3485d2..009e869 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java @@ -284,7 +284,7 @@ public abstract class BaseCommand implements Command { replyMsg.setMessageType(MessageType.REPLY); replyMsg.setNumberOfParts(1); replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop}); + replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop}); replyMsg.send(servConn); pr.getPrStats().incPRMetaDataSentCount(); if (logger.isTraceEnabled()) { @@ -701,7 +701,7 @@ public abstract class BaseCommand implements Command { if (callbackArg != null) { responseMsg.addObjPart(callbackArg); } - responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(),nwHop}); + responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion(),nwHop}); servConn.getCache().getCancelCriterion().checkCancelInProgress(null); responseMsg.send(servConn); origMsg.clearParts(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java index 4947e20..b5506d6 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java @@ -16,8 +16,6 @@ */ package com.gemstone.gemfire.internal.cache.tier.sockets; -import static com.sun.tools.doclint.Entity.part; - import com.gemstone.gemfire.SerializationException; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.internal.Assert; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java index bc47e2a..10a72dd 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java @@ -29,7 +29,6 @@ import com.gemstone.gemfire.internal.cache.EventID; import com.gemstone.gemfire.internal.cache.EventIDHolder; import com.gemstone.gemfire.internal.cache.LocalRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; import com.gemstone.gemfire.internal.cache.tier.Command; import com.gemstone.gemfire.internal.cache.tier.MessageType; import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand; @@ -181,10 +180,9 @@ public class Destroy extends BaseCommand { } if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; - if (pr.isNetworkHop() != (byte) 0) { - writeReplyWithRefreshMetadata(msg, servConn, pr, pr.isNetworkHop()); - pr.setIsNetworkHop((byte) 0); - pr.setMetadataVersion(Byte.valueOf((byte) 0)); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType()); + pr.clearNetworkHopData(); } else { writeReply(msg, servConn); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java index a571f71..6c41fb5 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java @@ -301,10 +301,9 @@ public class Destroy65 extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; - if (pr.isNetworkHop() != (byte) 0) { - writeReplyWithRefreshMetadata(msg, servConn, pr, entryNotFoundForRemove, pr.isNetworkHop(), clientEvent.getVersionTag()); - pr.setIsNetworkHop((byte) 0); - pr.setMetadataVersion((byte) 0); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, servConn, pr, entryNotFoundForRemove, pr.getNetworkHopType(), clientEvent.getVersionTag()); + pr.clearNetworkHopData(); } else { writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), clientEvent.getVersionTag()); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java index 82d9c1a..aa9b865 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java @@ -65,7 +65,7 @@ public class Destroy70 extends Destroy65 { if (versionTag != null) { replyMsg.addObjPart(versionTag); } - replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop}); + replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop}); replyMsg.addIntPart(entryNotFoundForRemove? 1 : 0); pr.getPrStats().incPRMetaDataSentCount(); replyMsg.send(servConn); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java index f5d9937..c3daa64 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java @@ -201,11 +201,10 @@ public class Get70 extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; - if (pr.isNetworkHop() != (byte) 0) { + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject, - servConn, pr, pr.isNetworkHop(), versionTag, keyNotPresent); - pr.setIsNetworkHop((byte) 0); - pr.setMetadataVersion(Byte.valueOf((byte) 0)); + servConn, pr, pr.getNetworkHopType(), versionTag, keyNotPresent); + pr.clearNetworkHopData(); } else { writeResponse(data, callbackArg, msg, isObject, versionTag, keyNotPresent, servConn); @@ -490,7 +489,7 @@ public class Get70 extends BaseCommand { responseMsg.addObjPart(versionTag); } - responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(),nwHop}); + responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion(),nwHop}); servConn.getCache().getCancelCriterion().checkCancelInProgress(null); responseMsg.send(servConn); origMsg.clearParts(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java index 166b11a..2faa177 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java @@ -27,7 +27,6 @@ import com.gemstone.gemfire.internal.cache.EventID; import com.gemstone.gemfire.internal.cache.EventIDHolder; import com.gemstone.gemfire.internal.cache.LocalRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; import com.gemstone.gemfire.internal.cache.tier.Command; import com.gemstone.gemfire.internal.cache.tier.MessageType; import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand; @@ -190,10 +189,9 @@ public class Invalidate extends BaseCommand { } if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; - if (pr.isNetworkHop() != (byte) 0) { - writeReplyWithRefreshMetadata(msg, servConn, pr, pr.isNetworkHop(), tag); - pr.setIsNetworkHop((byte) 0); - pr.setMetadataVersion(Byte.valueOf((byte) 0)); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType(), tag); + pr.clearNetworkHopData(); } else { writeReply(msg, servConn, tag); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java index 6200438..f831999 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java @@ -60,7 +60,7 @@ public class Invalidate70 extends Invalidate { if (versionTag != null) { replyMsg.addObjPart(versionTag); } - replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop}); + replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop}); pr.getPrStats().incPRMetaDataSentCount(); replyMsg.send(servConn); if (logger.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java index 4529b2d..bfc3f20 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java @@ -261,10 +261,9 @@ public class Put61 extends BaseCommand { // Increment statistics and write the reply if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; - if (pr.isNetworkHop() != (byte) 0) { - writeReplyWithRefreshMetadata(msg, servConn, pr, pr.isNetworkHop()); - pr.setIsNetworkHop((byte) 0); - pr.setMetadataVersion(Byte.valueOf((byte) 0)); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType()); + pr.clearNetworkHopData(); } else { writeReply(msg, servConn); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java index 48d923c..e164ef5 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java @@ -50,7 +50,6 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.security.AuthorizeRequest; -import com.gemstone.gemfire.internal.security.SecurityService; import com.gemstone.gemfire.internal.util.Breadcrumbs; import com.gemstone.gemfire.security.GemFireSecurityException; @@ -426,12 +425,10 @@ public class Put65 extends BaseCommand { // Increment statistics and write the reply if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; - if (pr.isNetworkHop().byteValue() != (byte) 0) { - writeReplyWithRefreshMetadata(msg, servConn, pr, sendOldValue, oldValueIsObject, oldValue, pr.isNetworkHop() - .byteValue(), clientEvent - .getVersionTag()); - pr.setIsNetworkHop((byte) 0); - pr.setMetadataVersion(Byte.valueOf((byte) 0)); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, servConn, pr, sendOldValue, oldValueIsObject, oldValue, pr.getNetworkHopType() + , clientEvent.getVersionTag()); + pr.clearNetworkHopData(); } else { writeReply(msg, servConn, sendOldValue, oldValueIsObject, oldValue, clientEvent.getVersionTag()); } @@ -482,7 +479,7 @@ public class Put65 extends BaseCommand { replyMsg.setMessageType(MessageType.REPLY); replyMsg.setNumberOfParts(sendOldValue ? 3 : 1); replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(new byte[] { pr.getMetadataVersion().byteValue(), nwHopType }); + replyMsg.addBytesPart(new byte[] { pr.getMetadataVersion(), nwHopType }); if (sendOldValue) { replyMsg.addIntPart(oldValueIsObject ? 1 : 0); replyMsg.addObjPart(oldValue); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java index af16bed..bb517d9 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java @@ -105,7 +105,7 @@ public class Put70 extends Put65 { } replyMsg.setNumberOfParts(parts); replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHopType}); + replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHopType}); replyMsg.addIntPart(flags); if (sendOldValue) { // if (logger.fineEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java index 955677f..1579a1c 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java @@ -189,10 +189,9 @@ public class PutAll extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion)region; - if (pr.isNetworkHop() != (byte)0) { - writeReplyWithRefreshMetadata(msg, servConn,pr,pr.isNetworkHop()); - pr.setIsNetworkHop((byte)0); - pr.setMetadataVersion(Byte.valueOf((byte)0)); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, servConn,pr,pr.getNetworkHopType()); + pr.clearNetworkHopData(); replyWithMetaData = true; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java index 4e6e167..02bbc40 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java @@ -250,10 +250,9 @@ public class PutAll70 extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion)region; - if (pr.isNetworkHop().byteValue() != 0) { - writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.isNetworkHop()); - pr.setIsNetworkHop(Byte.valueOf((byte)0)); - pr.setMetadataVersion(Byte.valueOf((byte)0)); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType()); + pr.clearNetworkHopData(); replyWithMetaData = true; } } @@ -349,7 +348,7 @@ public class PutAll70 extends BaseCommand { replyMsg.setMessageType(MessageType.REPLY); replyMsg.setNumberOfParts(2); replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop}); + replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop}); if (response != null) { response.clearObjects(); replyMsg.addObjPart(response); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java index 78f5612..beeb3ce 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java @@ -291,10 +291,9 @@ public class PutAll80 extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion)region; - if (pr.isNetworkHop().byteValue() != 0) { - writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.isNetworkHop()); - pr.setIsNetworkHop(Byte.valueOf((byte)0)); - pr.setMetadataVersion(Byte.valueOf((byte)0)); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType()); + pr.clearNetworkHopData(); replyWithMetaData = true; } } @@ -417,7 +416,7 @@ public class PutAll80 extends BaseCommand { } replyMsg.setNumberOfParts(1); replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop}); + replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop}); if (listSize > 0) { replyMsg.setLastChunk(false); replyMsg.sendChunk(servConn); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java index 474d942..4203447 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java @@ -217,10 +217,9 @@ public class RemoveAll extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion)region; - if (pr.isNetworkHop().byteValue() != 0) { - writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.isNetworkHop()); - pr.setIsNetworkHop(Byte.valueOf((byte)0)); - pr.setMetadataVersion(Byte.valueOf((byte)0)); + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType()); + pr.clearNetworkHopData(); replyWithMetaData = true; } } @@ -346,7 +345,7 @@ public class RemoveAll extends BaseCommand { } replyMsg.setNumberOfParts(1); replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop}); + replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop}); if (listSize > 0) { replyMsg.setLastChunk(false); replyMsg.sendChunk(servConn); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java index c84e189..d19fe44 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java @@ -189,11 +189,10 @@ public class Request extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion)region; - if (pr.isNetworkHop() != (byte)0) { + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject, - servConn, pr,pr.isNetworkHop()); - pr.setIsNetworkHop((byte)0); - pr.setMetadataVersion(Byte.valueOf((byte)0)); + servConn, pr,pr.getNetworkHopType()); + pr.clearNetworkHopData(); } else { writeResponse(data, callbackArg, msg, isObject, servConn); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java index 8c934d8..78fd5e0 100755 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java @@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import com.jayway.awaitility.Awaitility; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -400,7 +399,7 @@ public class PartitionedRegionSingleHopDUnitTest extends JUnit4CacheTestCase { Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.isRefreshMetadataTestOnly() == true); //make sure all fetch tasks are completed - Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.getFetchTaskCount() == 0); + Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.getRefreshTaskCount() == 0); cms.satisfyRefreshMetadata_TEST_ONLY(false); region.put(new Integer(0), "create0"); @@ -1965,7 +1964,7 @@ public class PartitionedRegionSingleHopDUnitTest extends JUnit4CacheTestCase { private void verifyMetadata() { ClientMetadataService cms = ((GemFireCacheImpl)cache).getClientMetadataService(); //make sure all fetch tasks are completed - Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.getFetchTaskCount() == 0); + Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.getRefreshTaskCount() == 0); final Map<String, ClientPartitionAdvisor> regionMetaData = cms .getClientPRMetadata_TEST_ONLY(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java index e611086..a3e9d08 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java @@ -124,16 +124,19 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase { Integer port1 = (Integer) member1.invoke(() -> createServerForStats(0, 113, "No_Colocation")); Integer port2 = (Integer) member2.invoke(() -> createServerForStats(0, 113, "No_Colocation")); - member3.invoke(() -> createClient(port0, port1, port2, "No_Colocation")); + member3.invoke("createClient", () -> createClient(port0, port1, port2, "No_Colocation")); + System.out.println("createClient"); createClient(port0, port1, port2, "No_Colocation"); - member3.invoke(() -> createPR("FirstClient", "No_Colocation")); + member3.invoke("createPR", () -> createPR("FirstClient", "No_Colocation")); + System.out.println("createPR"); createPR("SecondClient", "No_Colocation"); - member3.invoke(() -> getPR("FirstClient", "No_Colocation")); + member3.invoke("getPR", () -> getPR("FirstClient", "No_Colocation")); + System.out.println("getPR"); getPR("SecondClient", "No_Colocation"); - member3.invoke(() -> updatePR("FirstClient", "No_Colocation")); + member3.invoke("updatePR", () -> updatePR("FirstClient", "No_Colocation")); } @Test @@ -163,7 +166,7 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase { .addServer("localhost", port1).addServer("localhost", port2) .setRetryAttempts(5) .setMinConnections(1) - .setMaxConnections(1) + .setMaxConnections(-1) .setSubscriptionEnabled(false) .create(Region_Name); } finally { @@ -325,7 +328,10 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase { nonSingleHopsCount = ((LocalRegion) region).getCachePerfStats().getNonSingleHopsCount(); assertTrue(metaDataRefreshCount != 0); // hops are not predictable assertTrue(nonSingleHopsCount != 0); + + System.out.println("metadata refresh count after second pass is " + metaDataRefreshCount); } else { + System.out.println("creating keys in second client"); for (int i = 0; i < 226; i++) { region.create(new Integer(i), "create" + i); } @@ -341,6 +347,7 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase { nonSingleHopsCount = ((LocalRegion) region).getCachePerfStats().getNonSingleHopsCount(); assertTrue(metaDataRefreshCount != 0); // hops are not predictable assertTrue(nonSingleHopsCount != 0); + System.out.println("metadata refresh count in second client is " + metaDataRefreshCount); } } else { createdColocatedPRData(cache);
