Repository: geode Updated Branches: refs/heads/develop 0dd2552b1 -> 16832655d
GEODE-2847: Get correct version tags for retried bulk operation Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/16832655 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/16832655 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/16832655 Branch: refs/heads/develop Commit: 16832655d18592ddaf3c89979be30e5e7caa10f1 Parents: 0dd2552 Author: eshu <[email protected]> Authored: Wed May 3 10:14:08 2017 -0700 Committer: eshu <[email protected]> Committed: Wed May 3 10:14:08 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/EventTracker.java | 117 +++++++------------ .../geode/internal/cache/LocalRegion.java | 24 ++-- .../cache/partitioned/PutAllPRMessage.java | 2 +- .../cache/partitioned/RemoveAllPRMessage.java | 2 +- .../tier/sockets/ClientProxyMembershipID.java | 2 +- .../AbstractDistributedRegionJUnitTest.java | 15 ++- .../cache/DistributedRegionJUnitTest.java | 54 ++++++++- 7 files changed, 120 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java index 2ddfdc4..278367c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java @@ -59,14 +59,12 @@ public class EventTracker { new ConcurrentHashMap<ThreadIdentifier, EventSeqnoHolder>(100); /** - * a mapping of originator to bulkOp's last status (true means finished processing) applied to - * this cache. + * a mapping of originator to bulkOps * - * Keys are instances of @link {@link ThreadIdentifier}, values are instances of - * {@link BulkOpProcessed}. + * Keys are instances of @link {@link ThreadIdentifier} */ - private final ConcurrentMap<ThreadIdentifier, BulkOpProcessed> recordedBulkOps = - new ConcurrentHashMap<ThreadIdentifier, BulkOpProcessed>(100); + private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps = + new ConcurrentHashMap<ThreadIdentifier, Object>(100); /** * a mapping of originator to bulkOperation's last version tags. This map differs from @@ -141,7 +139,7 @@ public class EventTracker { public EventTracker(LocalRegion region) { this.cache = region.cache; this.name = "Event Tracker for " + region.getName(); - this.initializationLatch = new StoppableCountDownLatch(region.stopper, 1); + this.initializationLatch = new StoppableCountDownLatch(region.getStopper(), 1); } /** start this event tracker */ @@ -307,19 +305,22 @@ public class EventTracker { } } + EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag); + if (logger.isTraceEnabled()) { + logger.trace("region event tracker recording {}", event); + } + recordSeqno(membershipID, newEvh); + // If this is a bulkOp, and concurrency checks are enabled, we need to // save the version tag in case we retry. - if (lr.concurrencyChecksEnabled + // Make recordBulkOp version tag after recordSeqno, so that recordBulkOpStart + // in a retry bulk op would not incorrectly remove the saved version tag in + // recordedBulkOpVersionTags + if (lr.getConcurrencyChecksEnabled() && (event.getOperation().isPutAll() || event.getOperation().isRemoveAll()) && lr.getServerProxy() == null) { recordBulkOpEvent(event, membershipID); } - - EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag); - if (logger.isTraceEnabled()) { - logger.trace("region event tracker recording {}", event); - } - recordSeqno(membershipID, newEvh); } /** @@ -542,24 +543,19 @@ public class EventTracker { ThreadIdentifier membershipID = new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - BulkOpProcessed opSyncObj = - recordedBulkOps.putIfAbsent(membershipID, new BulkOpProcessed(false)); - if (opSyncObj == null) { - opSyncObj = recordedBulkOps.get(membershipID); - } + Object opSyncObj = null; + do { + opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object()); + if (opSyncObj == null) { + opSyncObj = recordedBulkOps.get(membershipID); + } + } while (opSyncObj == null); + synchronized (opSyncObj) { try { - if (opSyncObj.getStatus() && logger.isDebugEnabled()) { - logger.debug("SyncBulkOp: The operation was performed by another thread."); - } else { - recordBulkOpStart(membershipID); - - // Perform the bulk op - r.run(); - // set to true in case another thread is waiting at sync - opSyncObj.setStatus(true); - recordedBulkOps.remove(membershipID); - } + recordBulkOpStart(membershipID, eventID); + // Perform the bulk op + r.run(); } finally { recordedBulkOps.remove(membershipID); } @@ -567,14 +563,23 @@ public class EventTracker { } /** - * Called when a bulkOp is started on the local region. Used to clear event tracker state from the - * last bulkOp. + * Called when a new bulkOp is started on the local region. Used to clear event tracker state from + * the last bulkOp. */ - public void recordBulkOpStart(ThreadIdentifier tid) { + public void recordBulkOpStart(ThreadIdentifier tid, EventID eventID) { if (logger.isDebugEnabled()) { logger.debug("recording bulkOp start for {}", tid.expensiveToString()); } - this.recordedBulkOpVersionTags.remove(tid); + EventSeqnoHolder evh = recordedEvents.get(tid); + if (evh == null) { + return; + } + synchronized (evh) { + // only remove it when a new bulk op occurs + if (eventID.getSequenceID() > evh.lastSeqno) { + this.recordedBulkOpVersionTags.remove(tid); + } + } } /** @@ -660,50 +665,6 @@ public class EventTracker { } /** - * A status tracker for each bulk operation (putAll or removeAll) from originators specified by - * membershipID and threadID in the cache processed is true means the bulk op is processed by one - * thread no need to redo it by other threads. - * - * @since GemFire 5.7 - */ - static class BulkOpProcessed { - /** whether the op is processed */ - private boolean processed; - - /** - * creates a new instance to save status of a bulk op - * - * @param status true if the op has been processed - */ - BulkOpProcessed(boolean status) { - this.processed = status; - } - - /** - * setter method to change the status - * - * @param status true if the op has been processed - */ - void setStatus(boolean status) { - this.processed = status; - } - - /** - * getter method to peek the current status - * - * @return current status - */ - boolean getStatus() { - return this.processed; - } - - @Override - public String toString() { - return "BULKOP(" + this.processed + ")"; - } - } - - /** * A holder for the version tags generated for a bulk operation (putAll or removeAll). These * version tags are retrieved when a bulk op is retried. * http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 8c061b0..2dec53b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -505,6 +505,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return new Stopper(); } + protected CancelCriterion getStopper() { + return this.stopper; + } + private final TestCallable testCallable; /** @@ -682,10 +686,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, /** - * Test method for getting the event tracker. * - * this method is for testing only. Other region classes may track events using different - * mechanisms than EventTrackers + * Other region classes may track events using different mechanisms than EventTrackers */ protected EventTracker getEventTracker() { return this.eventTracker; @@ -3475,6 +3477,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } + protected boolean getEnableConcurrencyChecks() { + return this.concurrencyChecksEnabled; + } + /** * validate attributes of subregion being created, sent to parent * @@ -6151,8 +6157,12 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, isDup = this.eventTracker.hasSeenEvent(event); if (isDup) { event.setPossibleDuplicate(true); - if (this.concurrencyChecksEnabled && event.getVersionTag() == null) { - event.setVersionTag(findVersionTagForClientEvent(event.getEventId())); + if (getConcurrencyChecksEnabled() && event.getVersionTag() == null) { + if (event.isBulkOpInProgress()) { + event.setVersionTag(findVersionTagForClientBulkOp(event.getEventId())); + } else { + event.setVersionTag(findVersionTagForClientEvent(event.getEventId())); + } } } else { // bug #48205 - a retried PR operation may already have a version assigned to it @@ -6253,9 +6263,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - public void recordBulkOpStart(ThreadIdentifier membershipID) { + public void recordBulkOpStart(ThreadIdentifier membershipID, EventID eventID) { if (this.eventTracker != null && !isTX()) { - this.eventTracker.recordBulkOpStart(membershipID); + this.eventTracker.recordBulkOpStart(membershipID, eventID); } } http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java index 27f5aa0..ed1fe0a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java @@ -438,7 +438,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply { EventID eventID = putAllPRData[0].getEventID(); ThreadIdentifier membershipID = new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - bucketRegion.recordBulkOpStart(membershipID); + bucketRegion.recordBulkOpStart(membershipID, eventID); } bucketRegion.waitUntilLocked(keys); boolean lockedForPrimary = false; http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java index f4f6299..0e38ddc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java @@ -434,7 +434,7 @@ public final class RemoveAllPRMessage extends PartitionMessageWithDirectReply { EventID eventID = removeAllPRData[0].getEventID(); ThreadIdentifier membershipID = new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - bucketRegion.recordBulkOpStart(membershipID); + bucketRegion.recordBulkOpStart(membershipID, eventID); } bucketRegion.waitUntilLocked(keys); boolean lockedForPrimary = false; http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java index 2cbf63b..2fd508b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java @@ -38,7 +38,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.*; * * */ -public final class ClientProxyMembershipID +public class ClientProxyMembershipID implements DataSerializableFixedID, Serializable, Externalizable { private static final Logger logger = LogService.getLogger(); http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java index ba2f794..a8cbdde 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java @@ -106,7 +106,8 @@ public abstract class AbstractDistributedRegionJUnitTest { protected abstract void verifyDistributeUpdateEntryVersion(DistributedRegion region, EntryEventImpl event, int cnt); - protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled) { + protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled, + boolean testHasSeenEvent) { GemFireCacheImpl cache = Fakes.cache(); // create region attributes and internal region arguments @@ -122,14 +123,16 @@ public abstract class AbstractDistributedRegionJUnitTest { } doNothing().when(region).notifyGatewaySender(any(), any()); - doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class)); + if (!testHasSeenEvent) { + doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class)); + } return region; } @Test public void testConcurrencyFalseTagNull() { // case 1: concurrencyCheckEanbled = false, version tag is null: distribute - DistributedRegion region = prepare(false); + DistributedRegion region = prepare(false, false); EntryEventImpl event = createDummyEvent(region); assertNull(event.getVersionTag()); doTest(region, event, 1); @@ -138,7 +141,7 @@ public abstract class AbstractDistributedRegionJUnitTest { @Test public void testConcurrencyTrueTagNull() { // case 2: concurrencyCheckEanbled = true, version tag is null: not to distribute - DistributedRegion region = prepare(true); + DistributedRegion region = prepare(true, false); EntryEventImpl event = createDummyEvent(region); assertNull(event.getVersionTag()); doTest(region, event, 0); @@ -147,7 +150,7 @@ public abstract class AbstractDistributedRegionJUnitTest { @Test public void testConcurrencyTrueTagInvalid() { // case 3: concurrencyCheckEanbled = true, version tag is invalid: not to distribute - DistributedRegion region = prepare(true); + DistributedRegion region = prepare(true, false); EntryEventImpl event = createDummyEvent(region); VersionTag tag = createVersionTag(false); event.setVersionTag(tag); @@ -158,7 +161,7 @@ public abstract class AbstractDistributedRegionJUnitTest { @Test public void testConcurrencyTrueTagValid() { // case 4: concurrencyCheckEanbled = true, version tag is valid: distribute - DistributedRegion region = prepare(true); + DistributedRegion region = prepare(true, false); EntryEventImpl event = createDummyEvent(region); VersionTag tag = createVersionTag(true); event.setVersionTag(tag); http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java index 7525f35..ce21c67 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java @@ -14,15 +14,23 @@ */ package org.apache.geode.internal.cache; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.*; -import org.junit.experimental.categories.Category; +import java.util.concurrent.ConcurrentMap; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.apache.geode.cache.Operation; import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.internal.cache.EventTracker.BulkOpHolder; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; +import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -99,5 +107,47 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe } } + @Test + public void retriedBulkOpGetsSavedVersionTag() { + DistributedRegion region = prepare(true, true); + DistributedMember member = mock(DistributedMember.class); + ClientProxyMembershipID memberId = mock(ClientProxyMembershipID.class); + doReturn(false).when(region).isUsedForPartitionedRegionBucket(); + + byte[] memId = {1, 2, 3}; + long threadId = 1; + long retrySeqId = 1; + ThreadIdentifier tid = new ThreadIdentifier(memId, threadId); + EventID retryEventID = new EventID(memId, threadId, retrySeqId); + boolean skipCallbacks = true; + int size = 2; + recordPutAllEvents(region, memId, threadId, skipCallbacks, member, memberId, size); + EventTracker eventTracker = region.getEventTracker(); + + ConcurrentMap<ThreadIdentifier, BulkOpHolder> map = eventTracker.getRecordedBulkOpVersionTags(); + BulkOpHolder holder = map.get(tid); + + EntryEventImpl retryEvent = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key1", + "value1", null, false, member, !skipCallbacks, retryEventID); + retryEvent.setContext(memberId); + retryEvent.setPutAllOperation(mock(DistributedPutAllOperation.class)); + + region.hasSeenEvent(retryEvent); + assertTrue(retryEvent.getVersionTag().equals(holder.entryVersionTags.get(retryEventID))); + } + + protected void recordPutAllEvents(DistributedRegion region, byte[] memId, long threadId, + boolean skipCallbacks, DistributedMember member, ClientProxyMembershipID memberId, int size) { + EntryEventImpl[] events = new EntryEventImpl[size]; + EventTracker eventTracker = region.getEventTracker(); + for (int i = 0; i < size; i++) { + events[i] = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key" + i, "value" + i, + null, false, member, !skipCallbacks, new EventID(memId, threadId, i + 1)); + events[i].setContext(memberId); + events[i].setVersionTag(mock(VersionTag.class)); + eventTracker.recordEvent(events[i]); + } + } + }
