Repository: geode Updated Branches: refs/heads/develop 29ea88a23 -> 56f976c89
GEODE-2939: Make sure bucket region initiate event tracker from the image provider. Save all event states from remote processes. Initiate event tracker from the image provider only. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56f976c8 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56f976c8 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56f976c8 Branch: refs/heads/develop Commit: 56f976c89fabed58a086a845593efc2ef6e75114 Parents: 29ea88a Author: eshu <[email protected]> Authored: Thu May 25 16:38:55 2017 -0700 Committer: eshu <[email protected]> Committed: Thu May 25 17:14:09 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/BucketRegion.java | 29 ++++++++ .../cache/CacheDistributionAdvisee.java | 8 ++ .../internal/cache/CreateRegionProcessor.java | 36 ++++----- .../geode/internal/cache/DistributedRegion.java | 9 +++ .../geode/internal/cache/EventTracker.java | 3 +- .../internal/cache/InitialImageOperation.java | 3 + .../internal/cache/EventTrackerDUnitTest.java | 78 ++++++++++++++++++++ 7 files changed, 147 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 7bfffb7..31b341a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -29,8 +29,11 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.HeapDataOutputStream; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile; +import org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyProcessor; +import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder; import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo; import org.apache.geode.internal.cache.control.MemoryEvent; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.partitioned.DestroyMessage; import org.apache.geode.internal.cache.partitioned.InvalidateMessage; @@ -92,6 +95,8 @@ public class BucketRegion extends DistributedRegion implements Bucket { private final AtomicLong numOverflowBytesOnDisk = new AtomicLong(); private final AtomicLong numEntriesInVM = new AtomicLong(); private final AtomicLong evictions = new AtomicLong(); + // For GII + private CreateRegionReplyProcessor createRegionReplyProcessor; /** * Contains size in bytes of the values stored in theRealMap. Sizes are tallied during put and @@ -281,6 +286,30 @@ public class BucketRegion extends DistributedRegion implements Bucket { } @Override + public void registerCreateRegionReplyProcessor(CreateRegionReplyProcessor processor) { + this.createRegionReplyProcessor = processor; + } + + @Override + protected void recordEventStateFromImageProvider(InternalDistributedMember provider) { + if (this.createRegionReplyProcessor != null) { + Map<ThreadIdentifier, EventSeqnoHolder> providerEventStates = + this.createRegionReplyProcessor.getEventState(provider); + if (providerEventStates != null) { + recordEventState(provider, providerEventStates); + } else { + // Does not see this to happen. Just in case we get gii from a node + // that was not in the cluster originally when we sent + // createRegionMessage (its event tracker was saved), + // but later available before we could get gii from anyone else. + // This will not cause data inconsistent issue. Log this message for debug purpose. + logger.info("Could not initiate event tracker from GII provider {}", provider); + } + this.createRegionReplyProcessor = null; + } + } + + @Override protected CacheDistributionAdvisor createDistributionAdvisor( InternalRegionArguments internalRegionArgs) { return internalRegionArgs.getBucketAdvisor(); http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java index e4a7957..d933019 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.distributed.internal.DistributionAdvisee; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; +import org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyProcessor; /** * Distributed cache object (typically a <code>Region</code>) which uses a @@ -54,4 +55,11 @@ public interface CacheDistributionAdvisee extends DistributionAdvisee { * @param profile the remote member's profile */ public void remoteRegionInitialized(CacheProfile profile); + + /** + * Allow this advisee to know the CreateRegionReplyProcessor that is creating it. + * + * @param processor the CreateRegionReplyProcessor that is creating the advisee + */ + default public void registerCreateRegionReplyProcessor(CreateRegionReplyProcessor processor) {} } http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java index c1d1e77..1e38065 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.logging.log4j.Logger; @@ -48,6 +49,8 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; +import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; import org.apache.geode.internal.cache.partitioned.RegionAdvisor; @@ -96,6 +99,7 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { } CreateRegionReplyProcessor replyProc = new CreateRegionReplyProcessor(recps); + newRegion.registerCreateRegionReplyProcessor(replyProc); boolean useMcast = false; // multicast is disabled for this message for now CreateRegionMessage msg = getCreateRegionMessage(recps, replyProc, useMcast); @@ -199,17 +203,16 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { .getDistributedSystem(), members); } - /** - * guards application of event state to the region so that we deserialize and apply event state - * only once - */ - private Object eventStateLock = new Object(); - - /** whether event state has been recorded in the region */ - private boolean eventStateRecorded = false; + private final Map<DistributedMember, Map<ThreadIdentifier, EventSeqnoHolder>> remoteEventStates = + new ConcurrentHashMap<>(); private boolean allMembersSkippedChecks = true; + public Map<ThreadIdentifier, EventSeqnoHolder> getEventState( + InternalDistributedMember provider) { + return this.remoteEventStates.get(provider); + } + /** * true if all members skipped CreateRegionMessage#checkCompatibility(), in which case * CreateRegionMessage should be retried. @@ -218,6 +221,7 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { return this.allMembersSkippedChecks; } + @SuppressWarnings("unchecked") @Override public void process(DistributionMessage msg) { Assert.assertTrue(msg instanceof CreateRegionReplyMessage, @@ -246,17 +250,13 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { RegionAdvisor ra = (RegionAdvisor) cda; ra.putBucketRegionProfiles(reply.bucketProfiles); } - if (reply.eventState != null && lr.hasEventTracker()) { - synchronized (eventStateLock) { - if (!this.eventStateRecorded) { - this.eventStateRecorded = true; - Object eventState = null; - eventState = reply.eventState; - lr.recordEventState(reply.getSender(), (Map) eventState); - } - } + + // Save all event states, need to initiate the event tracker from the GII provider + if (reply.eventState != null) { + remoteEventStates.put(reply.getSender(), + (Map<ThreadIdentifier, EventSeqnoHolder>) reply.eventState); } - reply.eventState = null; + if (lr.isUsedForPartitionedRegionBucket()) { ((BucketRegion) lr).updateEventSeqNum(reply.seqKeyForWan); } http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 650fe2a..9df64d0 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -261,6 +261,15 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } /** + * Record the event state from image provider + * + * @param provider the member that provided the initial image and event state + */ + protected void recordEventStateFromImageProvider(InternalDistributedMember provider) { + // No Op. Only Bucket region will initiate event states + } + + /** * Intended for used during construction of a DistributedRegion * * @return the advisor to be used by the region http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java index 2c86aed..b919043 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java @@ -99,7 +99,8 @@ public class EventTracker { String name; /** - * whether or not this tracker has been initialized with state from another process + * whether or not this tracker has been initialized to allow entry operation. replicate region + * does not initiate event tracker from its replicates. */ volatile boolean initialized; http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java index 82df980..f8e9d0f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java @@ -231,11 +231,13 @@ public class InitialImageOperation { } } long giiStart = this.region.getCachePerfStats().startGetInitialImage(); + InternalDistributedMember provider = null; for (Iterator itr = recipients.iterator(); !this.gotImage && itr.hasNext();) { // if we got a partial image from the previous recipient, then clear it InternalDistributedMember recipient = (InternalDistributedMember) itr.next(); + provider = recipient; // In case of HARegion, before getting the region snapshot(image) get the filters // registered by the associated client and apply them. @@ -546,6 +548,7 @@ public class InitialImageOperation { } // for if (this.gotImage) { + this.region.recordEventStateFromImageProvider(provider); this.region.getCachePerfStats().endGetInitialImage(giiStart); if (this.isDeltaGII) { this.region.getCachePerfStats().incDeltaGIICompleted(); http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java index 3faf41f..77c0998 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java @@ -19,8 +19,11 @@ import static org.junit.Assert.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -405,4 +408,79 @@ public class EventTrackerDUnitTest extends JUnit4CacheTestCase { protected static int getCacheServerPort() { return cacheServerPort; } + + /** + * Tests event track is initialized after gii + */ + @Test + public void testEventTrackerIsInitalized() throws CacheException { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + createPRInVMs(vm0, vm1, vm2); + + createPR(); + + doPutsInVMs(vm0, vm1, vm2); + + doPuts(); + + verifyEventTrackerContent(); + + // close the region + getCache().getRegion(getName()).close(); + + // create the region again. + createPR(); + + for (int i = 0; i < 12; i++) { + waitEntryIsLocal(i); + } + + // verify event track initialized after create region + verifyEventTrackerContent(); + + } + + private void waitEntryIsLocal(int i) { + Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .until(() -> getCache().getRegion(getName()).getEntry(i) != null); + } + + private void verifyEventTrackerContent() { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(getName()); + BucketRegion br = pr.getDataStore().getLocalBucketById(0); + Map<?, ?> eventStates = br.getEventState(); + assertTrue(eventStates.size() == 4); + } + + public void createPRInVMs(VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> createPR()); + } + } + + private void createPR() { + PartitionAttributesFactory paf = + new PartitionAttributesFactory().setRedundantCopies(3).setTotalNumBuckets(4); + RegionFactory fact = getCache().createRegionFactory(RegionShortcut.PARTITION) + .setPartitionAttributes(paf.create()); + fact.create(getName()); + } + + public void doPutsInVMs(VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> doPuts()); + } + } + + private void doPuts() { + Region region = getCache().getRegion(getName()); + for (int i = 0; i < 12; i++) { + region.put(i, i); + } + } }
