Repository: geode Updated Branches: refs/heads/feature/GEM-1353 e7ba045c7 -> b59370051
fix-3 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b5937005 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b5937005 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b5937005 Branch: refs/heads/feature/GEM-1353 Commit: b59370051f64a50c6719b8e4f12af6cf5d4c6b67 Parents: e7ba045 Author: zhouxh <[email protected]> Authored: Mon Apr 10 17:21:25 2017 -0700 Committer: zhouxh <[email protected]> Committed: Mon Apr 10 17:21:25 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/BucketRegion.java | 60 +++++++------------ .../cache/DistributedCacheOperation.java | 27 ++++++++- .../cache/DistributedClearOperation.java | 19 +----- .../geode/internal/cache/DistributedRegion.java | 61 +++----------------- .../internal/cache/LocalRegionDataView.java | 12 ++-- .../wan/serial/SerialGatewaySenderQueue.java | 8 +-- .../DistributedAckRegionCCEDUnitTest.java | 8 +-- .../cache/query/cq/dunit/CqQueryDUnitTest.java | 8 +-- 8 files changed, 67 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 70ef226..4e68520 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -577,7 +577,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } protected void distributeUpdateOperation(EntryEventImpl event, long lastModified) { - long viewVersion = -1; + long token = -1; UpdateOperation op = null; try { @@ -589,8 +589,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } else { // BR's put op = new UpdateOperation(event, lastModified); - viewVersion = op.startOperation(); - op.distribute(); + token = op.startOperation(); if (logger.isDebugEnabled()) { logger.debug("sent update operation : for region : {}: with event: {}", this.getName(), event); @@ -602,7 +601,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } } finally { if (op != null) { - op.endOperation(viewVersion); + op.endOperation(token); } } } @@ -620,7 +619,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { // distribution *before* we do basicPutPart2. final long modifiedTime = event.getEventTime(lastModified); - long viewVersion = -1; + long token = -1; UpdateOperation op = null; try { @@ -649,8 +648,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { try { // PR's put PR op = new UpdateOperation(event, modifiedTime); - viewVersion = op.startOperation(); - op.distribute(); + token = op.startOperation(); } finally { this.partitionedRegion.getPrStats().endSendReplication(start); } @@ -665,7 +663,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { return lastModifiedTime; } finally { if (op != null) { - op.endOperation(viewVersion); + op.endOperation(token); } } } @@ -911,20 +909,19 @@ public class BucketRegion extends DistributedRegion implements Bucket { protected void distributeInvalidateOperation(EntryEventImpl event) { InvalidateOperation op = null; - long viewVersion = -1; + long token = -1; try { if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) { // This cache has processed the event, forward operation // and event messages to backup buckets // BR.invalidate hasSeenEvent op = new InvalidateOperation(event); - viewVersion = op.startOperation(); - op.distribute(); + token = op.startOperation(); } event.invokeCallbacks(this, true, false); } finally { if (op != null) { - op.endOperation(viewVersion); + op.endOperation(token); } } } @@ -933,7 +930,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { void basicInvalidatePart2(final RegionEntry re, final EntryEventImpl event, boolean clearConflict, boolean invokeCallbacks) { // Assumed this is called with the entry synchronized - long viewVersion = -1; + long token = -1; InvalidateOperation op = null; try { @@ -955,14 +952,13 @@ public class BucketRegion extends DistributedRegion implements Bucket { // distribute op to bucket secondaries and event to other listeners // BR's invalidate op = new InvalidateOperation(event); - viewVersion = op.startOperation(); - op.distribute(); + token = op.startOperation(); } super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */, invokeCallbacks); } finally { if (op != null) { - op.endOperation(viewVersion); + op.endOperation(token); } } } @@ -1180,7 +1176,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } protected void distributeDestroyOperation(EntryEventImpl event) { - long viewVersion = -1; + long token = -1; DestroyOperation op = null; try { @@ -1198,8 +1194,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { // BR's destroy, not to trigger callback here event.setOldValueFromRegion(); op = new DestroyOperation(event); - viewVersion = op.startOperation(); - op.distribute(); + token = op.startOperation(); } } @@ -1208,14 +1203,14 @@ public class BucketRegion extends DistributedRegion implements Bucket { } } finally { if (op != null) { - op.endOperation(viewVersion); + op.endOperation(token); } } } @Override protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) { - long viewVersion = -1; + long token = -1; DestroyOperation op = null; try { // Assumed this is called with entry synchrony @@ -1237,13 +1232,12 @@ public class BucketRegion extends DistributedRegion implements Bucket { // This code assumes that this bucket is primary // BR.destroy for retain op = new DestroyOperation(event); - viewVersion = op.startOperation(); - op.distribute(); + token = op.startOperation(); } super.basicDestroyBeforeRemoval(entry, event); } finally { if (op != null) { - op.endOperation(viewVersion); + op.endOperation(token); } } } @@ -1333,14 +1327,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } protected void distributeUpdateEntryVersionOperation(EntryEventImpl event) { - UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event); - long viewVersion = -1; - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + new UpdateEntryVersionOperation(event).distribute(); } public int getRedundancyLevel() { @@ -1575,14 +1562,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } // Send out the destroy op to peers - DestroyRegionOperation dro = new DestroyRegionOperation(event, true); - long viewVersion = -1; - try { - viewVersion = dro.startOperation(); - dro.distribute(); - } finally { - dro.endOperation(viewVersion); - } + new DestroyRegionOperation(event, true).distribute(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index b77c80c..86063a2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -240,6 +240,12 @@ public abstract class DistributedCacheOperation { return true; } + /** + * region's distribution advisor marked that a distribution is about to start, then distribute. It + * returns a token, which is view version. Return -1 means the method did not succeed. This method + * must be invoked before toDistribute(). This method should pair with endOperation() in + * try/finally block. + */ public long startOperation() { DistributedRegion region = getRegion(); long viewVersion = -1; @@ -250,9 +256,14 @@ public abstract class DistributedCacheOperation { logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}", viewVersion); } + _distribute(); return viewVersion; } + /** + * region's distribution advisor marked that a distribution is ended. This method should pair with + * startOperation in try/finally block. + */ public void endOperation(long viewVersion) { DistributedRegion region = getRegion(); if (viewVersion != -1) { @@ -269,8 +280,22 @@ public abstract class DistributedCacheOperation { * who the recipients are and handles careful delivery of the operation to those members. */ public void distribute() { + long token = -1; + try { + token = startOperation(); + } finally { + endOperation(token); + } + } + + /** + * About to distribute a cache operation to other members of the distributed system. This method + * determines who the recipients are and handles careful delivery of the operation to those + * members. This method should wrapped by startOperation() and endOperation() in try/finally + * block. + */ + private void _distribute() { DistributedRegion region = getRegion(); - // logger.info("GGG:" + region); DM mgr = region.getDistributionManager(); boolean reliableOp = isOperationReliable() && region.requiresReliabilityCheck(); http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java index e209d77..9d10fc1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java @@ -67,15 +67,8 @@ public class DistributedClearOperation extends DistributedCacheOperation { **/ public static void clear(RegionEventImpl regionEvent, RegionVersionVector rvv, Set<InternalDistributedMember> recipients) { - long viewVersion = -1; - DistributedClearOperation op = new DistributedClearOperation( - DistributedClearOperation.OperationType.OP_CLEAR, regionEvent, rvv, recipients); - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + new DistributedClearOperation(DistributedClearOperation.OperationType.OP_CLEAR, regionEvent, + rvv, recipients).distribute(); } /** @@ -88,13 +81,7 @@ public class DistributedClearOperation extends DistributedCacheOperation { Set<InternalDistributedMember> recipients) { DistributedClearOperation dco = new DistributedClearOperation( DistributedClearOperation.OperationType.OP_LOCK_FOR_CLEAR, regionEvent, null, recipients); - long viewVersion = -1; - try { - viewVersion = dco.startOperation(); - dco.distribute(); - } finally { - dco.endOperation(viewVersion); - } + dco.distribute(); } @Override http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index affcfa7..ed1a2fe 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -439,16 +439,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (distribute) { // DR's put, it has notified gateway sender earlier UpdateOperation op = new UpdateOperation(event, lastModified); - long viewVersion = op.startOperation(); if (logger.isTraceEnabled()) { logger.trace("distributing operation for event : {} : for region : {}", event, this.getName()); } - try { - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + op.distribute(); } } } @@ -1684,14 +1679,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA boolean distribute = !event.getInhibitDistribution(); if (distribute) { // DR.destroy, it has notifiedGatewaySender ealier - long viewVersion = -1; DestroyOperation op = new DestroyOperation(event); - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + op.distribute(); } } } @@ -1746,14 +1735,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA * @since GemFire 5.7 */ protected void distributeInvalidateRegion(RegionEventImpl event) { - InvalidateRegionOperation op = new InvalidateRegionOperation(event); - long viewVersion = -1; - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + new InvalidateRegionOperation(event).distribute(); } /** @@ -1802,14 +1784,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (persistenceAdvisor != null) { persistenceAdvisor.releaseTieLock(); } - long viewVersion = -1; - DestroyRegionOperation op = new DestroyRegionOperation(event, notifyOfRegionDeparture); - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute(); } /** @@ -1887,14 +1862,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA boolean distribute = !event.getInhibitDistribution(); if (distribute) { // DR.invalidate, it has triggered callback earlier - long viewVersion = -1; InvalidateOperation op = new InvalidateOperation(event); - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + op.distribute(); } } } @@ -1927,13 +1896,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (event.isDistributed() && !event.isOriginRemote()) { // DR has sent callback earlier UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event); - long viewVersion = -1; - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + op.distribute(); } } } @@ -2138,13 +2101,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA this.getCachePerfStats().incTombstoneGCCount(); EventID eventId = new EventID(getSystem()); DistributedTombstoneOperation gc = DistributedTombstoneOperation.gc(this, eventId); - long viewVersion = -1; - try { - viewVersion = gc.startOperation(); - gc.distribute(); - } finally { - gc.endOperation(viewVersion); - } + gc.distribute(); notifyClientsOfTombstoneGC(getVersionVector().getTombstoneGCVector(), keysRemoved, eventId, null); return eventId; @@ -3393,7 +3350,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA public void postPutAllSend(DistributedPutAllOperation putAllOp, VersionedObjectList successfulPuts) { if (putAllOp.putAllDataSize > 0) { - putAllOp.distribute(); + putAllOp.startOperation(); } else { if (logger.isDebugEnabled()) { logger.debug("DR.postPutAll: no data to distribute"); @@ -3405,7 +3362,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA public void postRemoveAllSend(DistributedRemoveAllOperation op, VersionedObjectList successfulOps) { if (op.removeAllDataSize > 0) { - op.distribute(); + op.startOperation(); } else { getCache().getLoggerI18n().fine("DR.postRemoveAll: no data to distribute"); } http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java index 3d7418f..6d415d5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java @@ -312,16 +312,16 @@ public class LocalRegionDataView implements InternalDataView { putallOp.fillVersionedObjectList(successfulPuts); } // BR & DR's putAll - long viewVersion = -1; + long token = -1; try { if (region instanceof DistributedRegion) { - viewVersion = putallOp.startOperation(); + token = putallOp.startOperation(); } region.postPutAllSend(putallOp, successfulPuts); region.postPutAllFireEvents(putallOp, successfulPuts); } finally { if (region instanceof DistributedRegion) { - putallOp.endOperation(viewVersion); + putallOp.endOperation(token); } } } @@ -337,16 +337,16 @@ public class LocalRegionDataView implements InternalDataView { op.fillVersionedObjectList(successfulOps); } // BR, DR's removeAll - long viewVersion = -1; + long token = -1; try { if (region instanceof DistributedRegion) { - viewVersion = op.startOperation(); + token = op.startOperation(); } region.postRemoveAllSend(op, successfulOps); region.postRemoveAllFireEvents(op, successfulOps); } finally { if (region instanceof DistributedRegion) { - op.endOperation(viewVersion); + op.endOperation(token); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 435ad70..e6d54c5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -1142,13 +1142,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { event.setTailKey(temp); BatchDestroyOperation op = new BatchDestroyOperation(event); - long viewVersion = -1; - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + op.distribute(); if (logger.isDebugEnabled()) { logger.debug("BatchRemovalThread completed destroy of keys from {} to {}", lastDestroyedKey, temp); http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java index ff37e36..dcb6cf3 100644 --- a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java @@ -254,13 +254,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT // this should update the controller's cache with the updated value but leave this cache // alone DistributedCacheOperation op = new UpdateOperation(event, tag.getVersionTimeStamp()); - long viewVersion = -1; - try { - viewVersion = op.startOperation(); - op.distribute(); - } finally { - op.endOperation(viewVersion); - } + op.distribute(); event.release(); } }); http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java index 558df48..e320ff1 100644 --- a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java +++ b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java @@ -1577,13 +1577,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase { Region subregion = getCache().getRegion("root/" + regionName); DistributedTombstoneOperation gc = DistributedTombstoneOperation .gc((DistributedRegion) subregion, new EventID(getCache().getDistributedSystem())); - long viewVersion = -1; - try { - viewVersion = gc.startOperation(); - gc.distribute(); - } finally { - gc.endOperation(viewVersion); - } + gc.distribute(); } }; server.invoke(task);
