GEODE-538: Add check for persistent data recovery PartitionedRegion.getNodeForBucketReadOrLoad can return an invalid node if persistent data recovery is in process and a get() targets a bucket that hasn't been recoverd yet. This can result in returning an incorrect value (null) or throwing ConflictingPersistentDataException from a get() or put() on the region.
This change adds a check for persistent recovery to be completed before creating the new bucket. If recovery isn't complete then the operation on the region will fail with a PartitionOfflineException. Queries on a region while persistent recovery is in progress can also result in incorrect results so a similar check is added to DefaultQuery.checkQueryOnPR. This closes #264 Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/11ef3ebb Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/11ef3ebb Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/11ef3ebb Branch: refs/heads/feature/GEODE-1930 Commit: 11ef3ebbe30a8340f57776bf4063684b91ccd0a3 Parents: 7511ffa Author: Ken Howe <[email protected]> Authored: Thu Oct 6 15:02:24 2016 -0700 Committer: Anil <[email protected]> Committed: Wed Oct 19 15:49:33 2016 -0700 ---------------------------------------------------------------------- .../org/apache/geode/cache/query/Query.java | 12 + .../cache/query/internal/DefaultQuery.java | 6 +- .../internal/cache/PRHARedundancyProvider.java | 9 +- .../geode/internal/cache/PartitionedRegion.java | 18 +- .../geode/internal/i18n/LocalizedStrings.java | 1 + .../partitioned/PRBasicQueryDUnitTest.java | 221 ++++++++++ .../query/partitioned/PRQueryDUnitHelper.java | 185 +++++++++ ...tentColocatedPartitionedRegionDUnitTest.java | 411 ++++++++++++++++++- 8 files changed, 844 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/cache/query/Query.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java index e27687d..670b262 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java @@ -89,6 +89,9 @@ public interface Query { * @throws QueryExecutionLowMemoryException * If the query gets canceled due to low memory conditions and * the resource manager critical heap percentage has been set + * @throws PartitionOfflineException + * If persistent data recovery is not complete for a partitioned + * region referred to in the query. */ public Object execute() throws FunctionDomainException, TypeMismatchException, NameResolutionException, @@ -150,6 +153,9 @@ public interface Query { * @throws QueryExecutionLowMemoryException * If the query gets canceled due to low memory conditions and * the resource manager critical heap percentage has been set + * @throws PartitionOfflineException + * If persistent data recovery is not complete for a partitioned + * region referred to in the query. * */ public Object execute(Object[] params) @@ -220,6 +226,9 @@ public interface Query { * @throws QueryExecutionLowMemoryException * If the query gets canceled due to low memory conditions and * the resource manager critical heap percentage has been set + * @throws PartitionOfflineException + * If persistent data recovery is not complete for a partitioned + * region referred to in the query. */ public Object execute(RegionFunctionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException, @@ -291,6 +300,9 @@ public interface Query { * @throws QueryExecutionLowMemoryException * If the query gets canceled due to low memory conditions and * the resource manager critical heap percentage has been set + * @throws PartitionOfflineException + * If persistent data recovery is not complete for a partitioned + * region referred to in the query. */ public Object execute(RegionFunctionContext context, Object[] params) throws FunctionDomainException, TypeMismatchException, NameResolutionException, http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java index 58df390..8175d82 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java @@ -27,11 +27,14 @@ import org.apache.geode.cache.client.internal.UserAttributes; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.RegionFunctionContext; import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.cache.query.*; import org.apache.geode.cache.query.internal.cq.InternalCqQuery; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.NanoTimer; import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.i18n.LocalizedStrings; import java.util.*; @@ -581,7 +584,7 @@ public class DefaultQuery implements Query { } - private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException { + private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException, PartitionOfflineException { // check for PartititionedRegions. If a PartitionedRegion is referred to in the query, // then the following restrictions apply: @@ -601,6 +604,7 @@ public class DefaultQuery implements Query { throw new RegionNotFoundException(LocalizedStrings.DefaultQuery_REGION_NOT_FOUND_0.toLocalizedString(regionPath)); } if (rgn instanceof QueryExecutor) { + ((PartitionedRegion)rgn).checkPROffline(); prs.add((QueryExecutor)rgn); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java index cfedb67..6245c37 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java @@ -24,6 +24,7 @@ import org.apache.geode.cache.PartitionedRegionStorageException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.DistributionConfig; @@ -495,16 +496,20 @@ public class PRHARedundancyProvider * redundancy. * @throws PartitionedRegionException * if d-lock can not be acquired to create bucket. - * + * @throws PartitionOfflineException + * if persistent data recovery is not complete for a partitioned + * region referred to in the query. */ public InternalDistributedMember createBucketAtomically(final int bucketId, final int newBucketSize, final long startTime, final boolean finishIncompleteCreation, String partitionName) throws PartitionedRegionStorageException, - PartitionedRegionException + PartitionedRegionException, PartitionOfflineException { final boolean isDebugEnabled = logger.isDebugEnabled(); + + prRegion.checkPROffline(); // If there are insufficient stores throw *before* we try acquiring the // (very expensive) bucket lock or the (somewhat expensive) monitor on this http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index baab79f..f7ecdaf 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -28,6 +28,8 @@ import org.apache.geode.cache.client.internal.*; import org.apache.geode.cache.execute.*; import org.apache.geode.cache.partition.PartitionListener; import org.apache.geode.cache.partition.PartitionNotAvailableException; +import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.cache.query.*; import org.apache.geode.cache.query.internal.*; import org.apache.geode.cache.query.internal.index.*; @@ -1397,6 +1399,21 @@ public class PartitionedRegion extends LocalRegion implements new UpdateAttributesProcessor(this).distribute(false); } + /** + * Throw an exception if persistent data recovery from disk is not complete + * for this region. + * + * @throws PartitionOfflineException + */ + public void checkPROffline() throws PartitionOfflineException { + if (getDataPolicy().withPersistence() && !recoveredFromDisk) { + Set<PersistentID> persistIds = new HashSet(getRegionAdvisor().advisePersistentMembers().values()); + persistIds.removeAll(getRegionAdvisor().adviseInitializedPersistentMembers().values()); + throw new PartitionOfflineException(persistIds, LocalizedStrings.PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 + .toLocalizedString(new Object[] { getFullPath(), persistIds})); + } + } + public final void updatePRConfig(PartitionRegionConfig prConfig, boolean putOnlyIfUpdated) { final Set<Node> nodes = prConfig.getNodes(); @@ -3057,7 +3074,6 @@ public class PartitionedRegion extends LocalRegion implements final RetryTimeKeeper snoozer) { final boolean isDebugEnabled = logger.isDebugEnabled(); -// InternalDistributedSystem ids = (InternalDistributedSystem)this.cache.getDistributedSystem(); RetryTimeKeeper localSnoozer = snoozer; // Prevent early access to buckets that are not completely created/formed // and http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java index 8bfdd68..7d762b8 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java +++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java @@ -702,6 +702,7 @@ public class LocalizedStrings { public static final StringId AbstractDistributionConfig_CLIENT_CONFLATION_PROP_NAME = new StringId(1839, "Client override for server queue conflation setting"); public static final StringId PRHARRedundancyProvider_ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET = new StringId(1840, "allocate enough members to host bucket."); public static final StringId PRHARedundancyProvider_TIME_OUT_WAITING_0_MS_FOR_CREATION_OF_BUCKET_FOR_PARTITIONED_REGION_1_MEMBERS_REQUESTED_TO_CREATE_THE_BUCKET_ARE_2 = new StringId(1841, "Time out waiting {0} ms for creation of bucket for partitioned region {1}. Members requested to create the bucket are: {2}"); + public static final StringId PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 = new StringId(1842, "Partitioned Region {0} is offline due to unrecovered persistent data, {1}"); public static final StringId PUT_0_FAILED_TO_PUT_ENTRY_FOR_REGION_1_KEY_2_VALUE_3 = new StringId(1843, "{0}: Failed to put entry for region {1} key {2} value {3}"); public static final StringId PUT_0_UNEXPECTED_EXCEPTION = new StringId(1844, "{0}: Unexpected Exception"); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java index 8ef907a..224a7e0 100755 --- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java @@ -29,6 +29,7 @@ import static org.apache.geode.cache.query.Utils.*; import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.cache.query.Index; import org.apache.geode.cache.query.IndexType; import org.apache.geode.cache.query.Query; @@ -38,6 +39,7 @@ import org.apache.geode.cache.query.data.Portfolio; import org.apache.geode.cache.query.data.PortfolioData; import org.apache.geode.cache30.CacheSerializableRunnable; import org.apache.geode.internal.cache.PartitionedRegionDUnitTestCase; +import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.VM; @@ -67,6 +69,8 @@ public class PRBasicQueryDUnitTest extends PartitionedRegionDUnitTestCase } } + private final static int MAX_SYNC_WAIT = 30 * 1000; + PRQueryDUnitHelper PRQHelp = new PRQueryDUnitHelper(); final String name = "Portfolios"; @@ -153,6 +157,223 @@ public class PRBasicQueryDUnitTest extends PartitionedRegionDUnitTestCase "PRQBasicQueryDUnitTest#testPRBasicQuerying: Querying PR's Test ENDED"); } + /** + * A basic dunit test that <br> + * 1. Creates a PR and colocated child region Accessor and Data Store with redundantCopies = 0. + * 2. Populates the region with test data. + * 3. Fires a query on accessor VM and verifies the result. + * 4. Shuts down the caches, then restarts them asynchronously + * 5. Attempt the query while the regions are being recovered + * @throws Exception + */ + @Test + public void testColocatedPRQueryDuringRecovery() throws Exception + { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + setCacheInVMs(vm0, vm1); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR Test with DACK Started"); + + // Creting PR's on the participating VM's + // Creating Accessor node on the VM0. + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR"); + + vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name, + redundancy, PortfolioData.class, true)); + // Creating local region on vm0 to compare the results of query. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class)); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR"); + + // Creating the Datastores Nodes in the VM1. + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- Creating the Datastore node in the PR"); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name, + redundancy, PortfolioData.class, true)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created the Datastore node in the PR"); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created PR's across all VM's"); + + // Generating portfolio object array to be populated across the PR's & Local + // Regions + + final PortfolioData[] portfolio = createPortfolioData(cnt, cntDest); + // Putting the data into the PR's created + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(name, portfolio, + cnt, cntDest)); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(name, portfolio, + cnt, cntDest)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Inserted Portfolio data across PR's"); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(localName, + portfolio, cnt, cntDest)); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(localName, + portfolio, cnt, cntDest)); + + // querying the VM for data and comparing the result with query result of + // local region. + // querying the VM for data + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults( + name, localName)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 1st pass ENDED"); + + // Shut everything down and then restart to test queries during recovery + vm0.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache()); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache()); + + // Re-create the regions - only create the parent regions on the datastores + setCacheInVMs(vm0, vm1); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR"); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name, + redundancy, PortfolioData.class, true)); + + // Creating local region on vm0 to compare the results of query. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class)); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR"); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying: re-creating the Datastore node in the PR"); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name, + redundancy, PortfolioData.class, true)); + + // Now start the child regions asynchronously so queries will happen during persistent recovery + AsyncInvocation vm0PR = vm0.invokeAsync(PRQHelp.getCacheSerializableRunnableForColocatedChildCreate(name, + redundancy, PortfolioData.class, true)); + AsyncInvocation vm1PR = vm1.invokeAsync(PRQHelp.getCacheSerializableRunnableForColocatedChildCreate(name, + redundancy, PortfolioData.class, true)); + + // delay the query to let the recovery get underway + Thread.sleep(100); + + try { + // This is a repeat of the original query from before closing and restarting the datastores. This time + // it should fail due to persistent recovery that has not completed. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(name, localName, true)); + fail("Expected PartitionOfflineException when queryiong a region with offline colocated child"); + } catch (Exception e) { + if (!(e.getCause() instanceof PartitionOfflineException)) { + e.printStackTrace(); + throw e; + } + } + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 2nd pass (after restarting regions) ENDED"); + } + + /** + * A basic dunit test that <br> + * 1. Creates a PR and colocated child region Accessor and Data Store with redundantCopies = 0. + * 2. Populates the region with test data. + * 3. Fires a query on accessor VM and verifies the result. + * 4. Shuts down the caches, then restarts them asynchronously, but don't restart the child region + * 5. Attempt the query while the region offline because of the missing child region + * @throws Exception + */ + @SuppressWarnings("rawtypes") + @Test + public void testColocatedPRQueryDuringRecoveryWithMissingColocatedChild() throws Exception + { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + setCacheInVMs(vm0, vm1); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR Test with DACK Started"); + + // Creting PR's on the participating VM's + // Creating Accessor node on the VM0. + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR"); + + vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name, + redundancy, PortfolioData.class, true)); + // Creating local region on vm0 to compare the results of query. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class)); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR"); + + // Creating the Datastores Nodes in the VM1. + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- Creating the Datastore node in the PR"); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name, + redundancy, PortfolioData.class, true)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created the Datastore node in the PR"); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created PR's across all VM's"); + + // Generating portfolio object array to be populated across the PR's & Local + // Regions + + final PortfolioData[] portfolio = createPortfolioData(cnt, cntDest); + // Putting the data into the PR's created + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(name, portfolio, + cnt, cntDest)); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(name, portfolio, + cnt, cntDest)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Inserted Portfolio data across PR's"); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(localName, + portfolio, cnt, cntDest)); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(localName, + portfolio, cnt, cntDest)); + + // querying the VM for data and comparing the result with query result of + // local region. + // querying the VM for data + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults( + name, localName)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 1st pass ENDED"); + + // Shut everything down and then restart to test queries during recovery + vm0.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache()); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache()); + + // Re-create the only the parent region + setCacheInVMs(vm0, vm1); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR"); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name, + redundancy, PortfolioData.class, true)); + + // Creating local region on vm0 to compare the results of query. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class)); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR"); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- re-creating the Datastore node in the PR"); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name, + redundancy, PortfolioData.class, true)); + + try { + // This is a repeat of the original query from before closing and restarting the datastores. This time + // it should fail due to persistent recovery that has not completed. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(name, localName, true)); + fail("Expected PartitionOfflineException when queryiong a region with offline colocated child"); + } catch (Exception e) { + if (!(e.getCause() instanceof PartitionOfflineException)) { + throw e; + } + } + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 2nd pass (after restarting regions) ENDED"); + } + @Test public void testPRCountStarQuery() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java index cfb4190..9dc90fd 100755 --- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java @@ -39,6 +39,7 @@ import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.DiskStore; import org.apache.geode.cache.EntryExistsException; import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.PartitionAttributes; @@ -249,6 +250,190 @@ public class PRQueryDUnitHelper implements Serializable { return (CacheSerializableRunnable)createPrRegion; } + /** + * This function creates a colocated pair of PR's given the scope & the + * redundancy parameters for the parent * + * + * @param regionName + * @param redundancy + * @param constraint + * @param makePersistent + * @return cacheSerializable object + */ + public CacheSerializableRunnable getCacheSerializableRunnableForColocatedPRCreate( + final String regionName, final int redundancy, final Class constraint, boolean makePersistent) { + + final String childRegionName = regionName + "Child"; + final String diskName = "disk"; + SerializableRunnable createPrRegion; + createPrRegion = new CacheSerializableRunnable(regionName) { + @Override + public void run2() throws CacheException + { + + Cache cache = getCache(); + Region partitionedregion = null; + Region childRegion = null; + AttributesFactory attr = new AttributesFactory(); + attr.setValueConstraint(constraint); + if (makePersistent) { + DiskStore ds = cache.findDiskStore(diskName); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(JUnit4CacheTestCase.getDiskDirs()) + .create(diskName); + } + attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + attr.setDiskStoreName(diskName); + } else { + attr.setDataPolicy(DataPolicy.PARTITION); + attr.setDiskStoreName(null); + } + + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundancy); + attr.setPartitionAttributes(paf.create()); + + // parent region + partitionedregion = cache.createRegion(regionName, attr.create()); + assertNotNull( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region " + + regionName + " not in cache", cache.getRegion(regionName)); + assertNotNull( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref null", + partitionedregion); + assertTrue( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref claims to be destroyed", + !partitionedregion.isDestroyed()); + + // child region + attr.setValueConstraint(constraint); + paf.setColocatedWith(regionName); + attr.setPartitionAttributes(paf.create()); + childRegion = cache.createRegion(childRegionName, attr.create()); + } + }; + + return (CacheSerializableRunnable)createPrRegion; + } + + /** + * This function creates the parent region of colocated pair of PR's given the scope & the + * redundancy parameters for the parent * + * + * @param regionName + * @param redundancy + * @param constraint + * @param makePersistent + * @return cacheSerializable object + */ + public CacheSerializableRunnable getCacheSerializableRunnableForColocatedParentCreate( + final String regionName, final int redundancy, final Class constraint, boolean makePersistent) { + + final String childRegionName = regionName + "Child"; + final String diskName = "disk"; + SerializableRunnable createPrRegion; + createPrRegion = new CacheSerializableRunnable(regionName + "-NoChildRegion") { + @Override + public void run2() throws CacheException + { + + Cache cache = getCache(); + Region partitionedregion = null; + Region childRegion = null; + AttributesFactory attr = new AttributesFactory(); + attr.setValueConstraint(constraint); + if (makePersistent) { + DiskStore ds = cache.findDiskStore(diskName); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(JUnit4CacheTestCase.getDiskDirs()) + .create(diskName); + } + attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + attr.setDiskStoreName(diskName); + } else { + attr.setDataPolicy(DataPolicy.PARTITION); + attr.setDiskStoreName(null); + } + + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundancy); + attr.setPartitionAttributes(paf.create()); + + // parent region + partitionedregion = cache.createRegion(regionName, attr.create()); + assertNotNull( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region " + + regionName + " not in cache", cache.getRegion(regionName)); + assertNotNull( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref null", + partitionedregion); + assertTrue( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref claims to be destroyed", + !partitionedregion.isDestroyed()); + } + }; + + return (CacheSerializableRunnable)createPrRegion; + } + + /** + * This function creates the parent region of colocated pair of PR's given the scope & the + * redundancy parameters for the parent * + * + * @param regionName + * @param redundancy + * @param constraint + * @param isPersistent + * @return cacheSerializable object + */ + public CacheSerializableRunnable getCacheSerializableRunnableForColocatedChildCreate( + final String regionName, final int redundancy, final Class constraint, boolean isPersistent) { + + final String childRegionName = regionName + "Child"; + final String diskName = "disk"; + SerializableRunnable createPrRegion; + createPrRegion = new CacheSerializableRunnable(regionName + "-ChildRegion") { + @Override + public void run2() throws CacheException + { + + Cache cache = getCache(); + Region partitionedregion = null; + Region childRegion = null; + AttributesFactory attr = new AttributesFactory(); + attr.setValueConstraint(constraint); + if (isPersistent) { + DiskStore ds = cache.findDiskStore(diskName); + if (ds == null) { +// ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()) + ds = cache.createDiskStoreFactory().setDiskDirs(org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase.getDiskDirs()) + .create(diskName); + } + attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + attr.setDiskStoreName(diskName); + } else { + attr.setDataPolicy(DataPolicy.PARTITION); + attr.setDiskStoreName(null); + } + + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundancy); + attr.setPartitionAttributes(paf.create()); + + // skip parent region creation + // partitionedregion = cache.createRegion(regionName, attr.create()); + + // child region + attr.setValueConstraint(constraint); + paf.setColocatedWith(regionName); + attr.setPartitionAttributes(paf.create()); + childRegion = cache.createRegion(childRegionName, attr.create()); + } + }; + + return (CacheSerializableRunnable)createPrRegion; + } + public CacheSerializableRunnable getCacheSerializableRunnableForPRCreateLimitedBuckets( final String regionName, final int redundancy, final int buckets) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java index 0a25228..c15d545 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java @@ -50,7 +50,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import com.jayway.awaitility.core.ConditionTimeoutException; -import org.junit.experimental.categories.Category; import org.apache.geode.admin.internal.AdminDistributedSystemImpl; import org.apache.geode.cache.AttributesFactory; @@ -64,6 +63,7 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.control.RebalanceOperation; import org.apache.geode.cache.control.RebalanceResults; import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.DistributionMessageObserver; @@ -72,11 +72,14 @@ import org.apache.geode.internal.FileUtil; import org.apache.geode.internal.cache.ColocationLogger; import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage; import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.cache.control.InternalResourceManager; import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.LogWriterUtils; +import org.apache.geode.test.dunit.RMIException; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.SerializableCallable; import org.apache.geode.test.dunit.SerializableRunnable; @@ -2088,7 +2091,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar }; //runnable to create PRs - SerializableRunnable createPRs = new SerializableRunnable("region1") { + SerializableRunnable createPRs = new SerializableRunnable("createPRs") { public void run() { Cache cache = getCache(); @@ -2112,7 +2115,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar }; //runnable to close the cache. - SerializableRunnable closeCache = new SerializableRunnable("region1") { + SerializableRunnable closeCache = new SerializableRunnable("closeCache") { public void run() { closeCache(); } @@ -2120,7 +2123,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar //Runnable to do a bunch of puts handle exceptions //due to the fact that member is offline. - SerializableRunnable doABunchOfPuts = new SerializableRunnable("region1") { + SerializableRunnable doABunchOfPuts = new SerializableRunnable("doABunchOfPuts") { public void run() { Cache cache = getCache(); Region region = cache.getRegion(PR_REGION_NAME); @@ -2200,7 +2203,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar @Category(FlakyTest.class) // GEODE-506: time sensitive, async actions with 30 sec max @Test public void testRebalanceWithOfflineChildRegion() throws Throwable { - SerializableRunnable createParentPR = new SerializableRunnable() { + SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { public void run() { Cache cache = getCache(); @@ -2220,7 +2223,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar } }; - SerializableRunnable createChildPR = new SerializableRunnable() { + SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { public void run() { Cache cache = getCache(); @@ -2325,7 +2328,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar }; vm1.invoke(addHook); -// vm1.invoke(addHook); AsyncInvocation async0; AsyncInvocation async1; AsyncInvocation async2; @@ -2335,7 +2337,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar async1 = vm1.invokeAsync(createPRs); vm1.invoke(waitForHook); -// vm1.invoke(waitForHook); //Now create the parent region on vm-2. vm-2 did not //previous host the child region. @@ -2347,7 +2348,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar } finally { vm1.invoke(removeHook); -// vm1.invoke(removeHook); } async0.getResult(MAX_WAIT); @@ -2473,6 +2473,188 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar closeCache(); } + @Test + public void testParentRegionGetWithOfflineChildRegion() throws Throwable { + + SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { + public void run() { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + DiskStore ds = cache.findDiskStore("disk"); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); + } + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + af.setPartitionAttributes(paf.create()); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + cache.createRegion(PR_REGION_NAME, af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { + public void run() throws InterruptedException { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + paf.setColocatedWith(PR_REGION_NAME); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + af.setPartitionAttributes(paf.create()); + // delay child region creations to cause a delay in persistent recovery + Thread.sleep(100); + cache.createRegion("region2", af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + boolean caughtException = false; + try { + // Expect a get() on the un-recovered (due to offline child) parent region to fail + regionGetWithOfflineChild(createParentPR, createChildPR, false); + } catch (Exception e) { + caughtException = true; + assertTrue(e instanceof RMIException); + assertTrue(e.getCause() instanceof PartitionOfflineException); + } + if (!caughtException) { + fail("Expected TimeoutException from remote"); + } + } + + @Test + public void testParentRegionGetWithRecoveryInProgress() throws Throwable { + SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { + public void run() { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + DiskStore ds = cache.findDiskStore("disk"); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); + } + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + af.setPartitionAttributes(paf.create()); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + cache.createRegion(PR_REGION_NAME, af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + System.out.println("oldRetryTimeout = " + oldRetryTimeout); } + } + }; + + SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { + public void run() throws InterruptedException { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + paf.setColocatedWith(PR_REGION_NAME); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + af.setPartitionAttributes(paf.create()); + cache.createRegion("region2", af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + boolean caughtException = false; + try { + // Expect a get() on the un-recovered (due to offline child) parent region to fail + regionGetWithOfflineChild(createParentPR, createChildPR, false); + } catch (Exception e) { + caughtException = true; + assertTrue(e instanceof RMIException); + assertTrue(e.getCause() instanceof PartitionOfflineException); + } + if (!caughtException) { + fail("Expected TimeoutException from remote"); + } + } + + @Test + public void testParentRegionPutWithRecoveryInProgress() throws Throwable { + SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { + public void run() { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + System.out.println("oldRetryTimeout = " + oldRetryTimeout); + try { + Cache cache = getCache(); + DiskStore ds = cache.findDiskStore("disk"); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); + } + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + af.setPartitionAttributes(paf.create()); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + cache.createRegion(PR_REGION_NAME, af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { + public void run() throws InterruptedException { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + paf.setColocatedWith(PR_REGION_NAME); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + af.setPartitionAttributes(paf.create()); + Thread.sleep(1000); + cache.createRegion("region2", af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + boolean caughtException = false; + try { + // Expect a get() on the un-recovered (due to offline child) parent region to fail + regionGetWithOfflineChild(createParentPR, createChildPR, false); + } catch (Exception e) { + caughtException = true; + assertTrue(e instanceof RMIException); + assertTrue(e.getCause() instanceof PartitionOfflineException); + } + if (!caughtException) { + fail("Expected TimeoutException from remote"); + } + } + /** * Create three PRs on a VM, named region1, region2, and region3. * The colocated with attribute describes which region region3 @@ -2523,15 +2705,15 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar vm1.invoke(createParentPR); vm0.invoke(createChildPR); vm1.invoke(createChildPR); - + //Create some buckets. createData(vm0, 0, NUM_BUCKETS, "a"); createData(vm0, 0, NUM_BUCKETS, "a", "region2"); - + //Close the members closeCache(vm1); closeCache(vm0); - + //Recreate the parent region. Try to make sure that //the member with the latest copy of the buckets //is the one that decides to throw away it's copy @@ -2540,18 +2722,17 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar AsyncInvocation async1 = vm1.invokeAsync(createParentPR); async0.getResult(MAX_WAIT); async1.getResult(MAX_WAIT); - //Now create the parent region on vm-2. vm-2 did not //previous host the child region. vm2.invoke(createParentPR); - + //Rebalance the parent region. //This should not move any buckets, because //we haven't recovered the child region RebalanceResults rebalanceResults = rebalance(vm2); assertEquals(0, rebalanceResults.getTotalBucketTransfersCompleted()); - + //Recreate the child region. async1 = vm1.invokeAsync(createChildPR); async0 = vm0.invokeAsync(createChildPR); @@ -2568,6 +2749,206 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar createData(vm0, 0, NUM_BUCKETS, "c", "region2"); } + /** + * Create a colocated pair of persistent regions and populate them with data. Shut down the servers and then + * restart them and check the data. + * <p> + * On the restart, try region operations ({@code get()}) on the parent region before or during persistent recovery. + * The {@code concurrentCheckData} argument determines whether the operation from the parent region occurs before + * or concurrent with the child region creation and recovery. + * + * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member + * @param createChildPR {@link SerializableRunnable} for creating the child region on one member + * @param concurrentCheckData + * @throws Throwable + */ + public void regionGetWithOfflineChild( + SerializableRunnable createParentPR, + SerializableRunnable createChildPR, + boolean concurrentCheckData) throws Throwable { + Host host = Host.getHost(0); + final VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + //Create the PRs on two members + vm0.invoke(createParentPR); + vm1.invoke(createParentPR); + vm0.invoke(createChildPR); + vm1.invoke(createChildPR); + + //Create some buckets. + createData(vm0, 0, NUM_BUCKETS, "a"); + createData(vm0, 0, NUM_BUCKETS, "a", "region2"); + + //Close the members + closeCache(vm1); + closeCache(vm0); + + SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") { + @Override + public void run() { + Cache cache = getCache(); + Region region = cache.getRegion(PR_REGION_NAME); + + for (int i = 0; i < NUM_BUCKETS; i++) { + assertEquals("For key " + i, "a", region.get(i)); + } + } + }); + + try { + //Recreate the parent region. Try to make sure that + //the member with the latest copy of the buckets + //is the one that decides to throw away it's copy + //by starting it last. + AsyncInvocation async0 = vm0.invokeAsync(createParentPR); + AsyncInvocation async1 = vm1.invokeAsync(createParentPR); + async0.getResult(MAX_WAIT); + async1.getResult(MAX_WAIT); + //Now create the parent region on vm-2. vm-2 did not + //previously host the child region. + vm2.invoke(createParentPR); + + AsyncInvocation async2 = null; + AsyncInvocation asyncCheck = null; + if (concurrentCheckData) { + //Recreate the child region. + async1 = vm1.invokeAsync(createChildPR); + async0 = vm0.invokeAsync(createChildPR); + async2 = vm2.invokeAsync(new SerializableRunnable("delay") { + @Override + public void run() throws InterruptedException { + Thread.sleep(100); + vm2.invoke(createChildPR); + } + }); + + asyncCheck = vm0.invokeAsync(checkDataOnParent); + } else { + vm0.invoke(checkDataOnParent); + } + async0.getResult(MAX_WAIT); + async1.getResult(MAX_WAIT); + async2.getResult(MAX_WAIT); + asyncCheck.getResult(MAX_WAIT); + //Validate the data + checkData(vm0, 0, NUM_BUCKETS, "a"); + checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); + //Make sure we can actually use the buckets in the child region. + createData(vm0, 0, NUM_BUCKETS, "c", "region2"); + } finally { + //Close the members + closeCache(vm1); + closeCache(vm0); + closeCache(vm2); + } + } + /** + * Create a colocated pair of persistent regions and populate them with data. Shut down the servers and then + * restart them. + * <p> + * On the restart, try region operations ({@code put()}) on the parent region before or during persistent recovery. + * The {@code concurrentCreatekData} argument determines whether the operation from the parent region occurs before + * or concurrent with the child region creation and recovery. + * + * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member + * @param createChildPR {@link SerializableRunnable} for creating the child region on one member + * @param concurrentCreateData + * @throws Throwable + */ + public void regionPutWithOfflineChild( + SerializableRunnable createParentPR, + SerializableRunnable createChildPR, + boolean concurrentCreateData) throws Throwable { + Host host = Host.getHost(0); + final VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") { + @Override + public void run() { + Cache cache = getCache(); + Region region = cache.getRegion(PR_REGION_NAME); + + for (int i = 0; i < NUM_BUCKETS; i++) { + assertEquals("For key " + i, "a", region.get(i)); + } + } + }); + + SerializableRunnable createDataOnParent = new SerializableRunnable("createDataOnParent") { + + public void run() { + Cache cache = getCache(); + LogWriterUtils.getLogWriter().info("creating data in " + PR_REGION_NAME); + Region region = cache.getRegion(PR_REGION_NAME); + + for (int i = 0; i < NUM_BUCKETS; i++) { + region.put(i, "c"); + assertEquals("For key " + i, "c", region.get(i)); + } + } + }; + + //Create the PRs on two members + vm0.invoke(createParentPR); + vm1.invoke(createParentPR); + vm0.invoke(createChildPR); + vm1.invoke(createChildPR); + + //Create some buckets. + createData(vm0, 0, NUM_BUCKETS, "a"); + createData(vm0, 0, NUM_BUCKETS, "a", "region2"); + + //Close the members + closeCache(vm1); + closeCache(vm0); + + try { + //Recreate the parent region. Try to make sure that + //the member with the latest copy of the buckets + //is the one that decides to throw away it's copy + //by starting it last. + AsyncInvocation async0 = vm0.invokeAsync(createParentPR); + AsyncInvocation async1 = vm1.invokeAsync(createParentPR); + async0.getResult(MAX_WAIT); + async1.getResult(MAX_WAIT); + //Now create the parent region on vm-2. vm-2 did not + //previous host the child region. + vm2.invoke(createParentPR); + + AsyncInvocation async2 = null; + AsyncInvocation asyncPut = null; + if (concurrentCreateData) { + //Recreate the child region. + async1 = vm1.invokeAsync(createChildPR); + async0 = vm0.invokeAsync(createChildPR); + async2 = vm2.invokeAsync(createChildPR); + + Thread.sleep(100); + asyncPut = vm0.invokeAsync(createDataOnParent); + } else { + vm0.invoke(createDataOnParent); + } + async0.getResult(MAX_WAIT); + async1.getResult(MAX_WAIT); + async2.getResult(MAX_WAIT); + asyncPut.getResult(MAX_WAIT); + //Validate the data + checkData(vm0, 0, NUM_BUCKETS, "c"); + checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); + //Make sure we can actually use the buckets in the child region. + createData(vm0, 0, NUM_BUCKETS, "c", "region2"); + } finally { + //Close the members + closeCache(vm1); + closeCache(vm0); + closeCache(vm2); + } + } + private RebalanceResults rebalance(VM vm) { return (RebalanceResults) vm.invoke(new SerializableCallable() {
