GEODE-11: Fix index recovery after rebalance
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/5dadbb4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/5dadbb4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/5dadbb4f Branch: refs/heads/develop Commit: 5dadbb4fe3f99980033671c7e1a6e13c3daff30c Parents: 710aef8 Author: Ashvin Agrawal <[email protected]> Authored: Thu Oct 8 22:00:53 2015 -0700 Committer: Ashvin Agrawal <[email protected]> Committed: Thu Oct 8 22:01:58 2015 -0700 ---------------------------------------------------------------------- .../internal/PartitionedRepositoryManager.java | 3 - .../lucene/internal/filesystem/FileSystem.java | 1 + .../internal/LuceneRebalanceJUnitTest.java | 75 ++++++++++++++++++++ .../PartitionedRepositoryManagerJUnitTest.java | 27 +++---- .../LuceneFunctionReadPathDUnitTest.java | 5 +- 5 files changed, 92 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5dadbb4f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java index 91ad82c..e276cff 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java @@ -17,12 +17,9 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl; import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; -import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; import com.gemstone.gemfire.internal.cache.BucketNotFoundException; import com.gemstone.gemfire.internal.cache.BucketRegion; -import com.gemstone.gemfire.internal.cache.LocalDataSet; import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper; import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext; import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5dadbb4f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java index 50b9f50..5e29437 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java @@ -90,6 +90,7 @@ public class FileSystem { destFile.length = sourceFile.length; destFile.modified = sourceFile.modified; destFile.id = sourceFile.id; + updateFile(destFile); // TODO - What is the state of the system if // things crash in the middle of moving this file? http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5dadbb4f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRebalanceJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRebalanceJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRebalanceJUnitTest.java new file mode 100644 index 0000000..478981f --- /dev/null +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRebalanceJUnitTest.java @@ -0,0 +1,75 @@ +package com.gemstone.gemfire.cache.lucene.internal; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.PartitionAttributes; +import com.gemstone.gemfire.cache.PartitionAttributesFactory; +import com.gemstone.gemfire.cache.RegionFactory; +import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; +import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer; +import com.gemstone.gemfire.internal.cache.BucketNotFoundException; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class LuceneRebalanceJUnitTest { + String[] indexedFields = new String[] { "txt" }; + HeterogenousLuceneSerializer mapper = new HeterogenousLuceneSerializer(indexedFields); + Analyzer analyzer = new StandardAnalyzer(); + + @Before + public void setup() { + indexedFields = new String[] { "txt" }; + mapper = new HeterogenousLuceneSerializer(indexedFields); + analyzer = new StandardAnalyzer(); + LuceneServiceImpl.registerDataSerializables(); + } + + /** + * Test what happens when a bucket is destroyed. + */ + @Test + public void recoverRepoInANewNode() throws BucketNotFoundException, IOException { + Cache cache = new CacheFactory().set("mcast-port", "0").create(); + PartitionAttributes<String, String> attrs = new PartitionAttributesFactory().setTotalNumBuckets(1).create(); + RegionFactory<String, String> regionfactory = cache.createRegionFactory(RegionShortcut.PARTITION); + regionfactory.setPartitionAttributes(attrs); + + PartitionedRegion userRegion = (PartitionedRegion) regionfactory.create("userRegion"); + // put an entry to create the bucket + userRegion.put("rebalance", "test"); + + PartitionedRegion fileRegion = (PartitionedRegion) regionfactory.create("fileRegion"); + PartitionedRegion chunkRegion = (PartitionedRegion) regionfactory.create("chunkRegion"); + + RepositoryManager manager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, mapper, analyzer); + IndexRepository repo = manager.getRepository(userRegion, 0, null); + assertNotNull(repo); + + repo.create("rebalance", "test"); + repo.commit(); + + // close the region to simulate bucket movement. New node will create repo using data persisted by old region + userRegion.close(); + + userRegion = (PartitionedRegion) regionfactory.create("userRegion"); + userRegion.put("rebalance", "test"); + manager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, mapper, analyzer); + IndexRepository newRepo = manager.getRepository(userRegion, 0, null); + + Assert.assertNotEquals(newRepo, repo); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5dadbb4f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java index 1225aa1..23518e1 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -1,8 +1,10 @@ package com.gemstone.gemfire.cache.lucene.internal; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import java.io.IOException; import java.util.Arrays; @@ -30,7 +32,6 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.Heteroge import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; import com.gemstone.gemfire.internal.cache.BucketNotFoundException; import com.gemstone.gemfire.internal.cache.BucketRegion; -import com.gemstone.gemfire.internal.cache.LocalDataSet; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegion.RetryTimeKeeper; import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore; @@ -71,8 +72,8 @@ public class PartitionedRepositoryManagerJUnitTest { public void getByKey() throws BucketNotFoundException, IOException { PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer()); - BucketRegion mockBucket0 = getMockBucket(0); - BucketRegion mockBucket1 = getMockBucket(1); + setUpMockBucket(0); + setUpMockBucket(1); IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); IndexRepositoryImpl repo1 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 1, null); @@ -95,7 +96,7 @@ public class PartitionedRepositoryManagerJUnitTest { public void destroyBucket() throws BucketNotFoundException, IOException { PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer()); - BucketRegion mockBucket0 = getMockBucket(0); + setUpMockBucket(0); IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); @@ -107,7 +108,7 @@ public class PartitionedRepositoryManagerJUnitTest { //Simulate rebalancing of a bucket by marking the old bucket is destroyed //and creating a new bucket Mockito.when(fileBucket0.isDestroyed()).thenReturn(true); - mockBucket0 = getMockBucket(0); + setUpMockBucket(0); IndexRepositoryImpl newRepo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); assertNotEquals(repo0, newRepo0); @@ -126,7 +127,7 @@ public class PartitionedRepositoryManagerJUnitTest { @Test public void createMissingBucket() throws BucketNotFoundException { PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer()); - BucketRegion mockBucket0 = getMockBucket(0); + setUpMockBucket(0); Mockito.when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(null); @@ -146,8 +147,8 @@ public class PartitionedRepositoryManagerJUnitTest { PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer()); - BucketRegion mockBucket0 = getMockBucket(0); - BucketRegion mockBucket1 = getMockBucket(1); + setUpMockBucket(0); + setUpMockBucket(1); Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1)); InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class); @@ -174,7 +175,7 @@ public class PartitionedRepositoryManagerJUnitTest { public void getMissingBucketByRegion() throws BucketNotFoundException { PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer()); - BucketRegion mockBucket0 = getMockBucket(0); + setUpMockBucket(0); Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1)); @@ -191,7 +192,7 @@ public class PartitionedRepositoryManagerJUnitTest { assertEquals(serializer, repo0.getSerializer()); } - private BucketRegion getMockBucket(int id) { + private BucketRegion setUpMockBucket(int id) { BucketRegion mockBucket = Mockito.mock(BucketRegion.class); BucketRegion fileBucket = Mockito.mock(BucketRegion.class); BucketRegion chunkBucket = Mockito.mock(BucketRegion.class); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5dadbb4f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionReadPathDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionReadPathDUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionReadPathDUnitTest.java index b78e9d6..6e44b72 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionReadPathDUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionReadPathDUnitTest.java @@ -135,9 +135,8 @@ public class LuceneFunctionReadPathDUnitTest extends CacheTestCase { }); //Make sure the search still works - // TODO: rebalance is broken when hooked with AEQ, disable the test for the time being -// server1.invoke(executeSearch); -// server2.invoke(executeSearch); + server1.invoke(executeSearch); + server2.invoke(executeSearch); } private static void putInRegion(Region<Object, Object> region, Object key, Object value) throws BucketNotFoundException, IOException {
