Changing how PartitionRepositoryManager handles destroyed buckets This class was using a weak hash map with BucketRegions as keys. Instead of that, use the bucket id as the key. I added support to IndexRepository for the repository to indicate if the underlying BucketRegion has been destroyed. If the IndexRepository is destroyed, we will create a new IndexRepository in PartitionRepositoryManager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/dc3e8f75 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/dc3e8f75 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/dc3e8f75 Branch: refs/heads/develop Commit: dc3e8f75c4bb26d0b3223709acffa426e208f301 Parents: 3ad1fe7 Author: Dan Smith <[email protected]> Authored: Tue Sep 22 14:22:35 2015 -0700 Committer: Dan Smith <[email protected]> Committed: Tue Sep 22 14:25:14 2015 -0700 ---------------------------------------------------------------------- .../internal/PartitionedRepositoryManager.java | 33 +++++++++++++------- .../internal/repository/IndexRepository.java | 7 +++++ .../repository/IndexRepositoryImpl.java | 12 +++++-- .../PartitionedRepositoryManagerJUnitTest.java | 26 +++++++++++++++ .../IndexRepositoryImplJUnitTest.java | 9 ++++-- 5 files changed, 70 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java index e301482..bcec1c9 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java @@ -20,7 +20,7 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException; import com.gemstone.gemfire.internal.cache.BucketRegion; import com.gemstone.gemfire.internal.cache.LocalDataSet; import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteWeakHashMap; +import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap; /** * Manages index repositories for partitioned regions. @@ -38,7 +38,7 @@ public class PartitionedRepositoryManager implements RepositoryManager { * * It is weak so that the old BucketRegion will be garbage collected. */ - CopyOnWriteWeakHashMap<BucketRegion, IndexRepository> indexRepositories = new CopyOnWriteWeakHashMap<BucketRegion, IndexRepository>(); + CopyOnWriteHashMap<Integer, IndexRepository> indexRepositories = new CopyOnWriteHashMap<Integer, IndexRepository>(); /** The user region for this index */ private final PartitionedRegion userRegion; @@ -73,7 +73,7 @@ public class PartitionedRepositoryManager implements RepositoryManager { throw new BucketNotFoundException("User bucket was not found for region " + region + "key " + key + " callbackarg " + callbackArg); } - return getRepository(userBucket); + return getRepository(userBucket.getId()); } @Override @@ -90,7 +90,7 @@ public class PartitionedRepositoryManager implements RepositoryManager { if(userBucket == null) { throw new BucketNotFoundException("User bucket was not found for region " + region + "bucket id " + bucketId); } else { - repos.add(getRepository(userBucket)); + repos.add(getRepository(userBucket.getId())); } } @@ -100,15 +100,24 @@ public class PartitionedRepositoryManager implements RepositoryManager { /** * Return the repository for a given user bucket */ - private IndexRepository getRepository(BucketRegion userBucket) throws BucketNotFoundException { - IndexRepository repo = indexRepositories.get(userBucket); + private IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException { + IndexRepository repo = indexRepositories.get(bucketId); + + //Remove the repository if it has been destroyed (due to rebalancing) + if(repo != null && repo.isClosed()) { + indexRepositories.remove(bucketId, repo); + repo = null; + } + if(repo == null) { try { - RegionDirectory dir = new RegionDirectory(getMatchingBucket(userBucket, fileRegion), getMatchingBucket(userBucket, chunkRegion)); + BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId); + BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId); + RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket); IndexWriterConfig config = new IndexWriterConfig(analyzer); IndexWriter writer = new IndexWriter(dir, config); - repo = new IndexRepositoryImpl(writer, serializer); - IndexRepository oldRepo = indexRepositories.putIfAbsent(userBucket, repo); + repo = new IndexRepositoryImpl(fileBucket, writer, serializer); + IndexRepository oldRepo = indexRepositories.putIfAbsent(bucketId, repo); if(oldRepo != null) { repo = oldRepo; } @@ -123,10 +132,10 @@ public class PartitionedRepositoryManager implements RepositoryManager { /** * Find the bucket in region2 that matches the bucket id from region1. */ - private BucketRegion getMatchingBucket(BucketRegion region1, PartitionedRegion region2) throws BucketNotFoundException { - BucketRegion result = region2.getDataStore().getLocalBucketById(region1.getId()); + private BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) throws BucketNotFoundException { + BucketRegion result = region.getDataStore().getLocalBucketById(bucketId); if(result == null) { - throw new BucketNotFoundException("Bucket not found for region " + region2 + " bucekt id " + region1.getId()); + throw new BucketNotFoundException("Bucket not found for region " + region + " bucekt id " + bucketId); } return result; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java index 549bf21..b852b82 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java @@ -45,4 +45,11 @@ public interface IndexRepository { * @throws IOException */ void commit() throws IOException; + + /** + * Check to see if this repository is closed due to + * underlying resources being closed or destroyed + * @return true if this repository is closed. + */ + public boolean isClosed(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java index 5c248cf..fbbc5db 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java @@ -11,6 +11,7 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.TopDocs; +import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.SerializerUtil; @@ -26,8 +27,10 @@ public class IndexRepositoryImpl implements IndexRepository { private final IndexWriter writer; private final LuceneSerializer serializer; private final SearcherManager searcherManager; + private Region<?,?> region; - public IndexRepositoryImpl(IndexWriter writer, LuceneSerializer serializer) throws IOException { + public IndexRepositoryImpl(Region<?,?> region, IndexWriter writer, LuceneSerializer serializer) throws IOException { + this.region = region; this.writer = writer; searcherManager = new SearcherManager(writer, APPLY_ALL_DELETES, null); this.serializer = serializer; @@ -83,6 +86,9 @@ public class IndexRepositoryImpl implements IndexRepository { public LuceneSerializer getSerializer() { return serializer; } - - + + @Override + public boolean isClosed() { + return region.isDestroyed(); + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java index db1085a..367f4f2 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -83,6 +83,32 @@ public class PartitionedRepositoryManagerJUnitTest { } /** + * Test what happens when a bucket is destroyed. + */ + @Test + public void destroyBucket() throws BucketNotFoundException, IOException { + PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer()); + + BucketRegion mockBucket0 = getMockBucket(0); + + IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); + + assertNotNull(repo0); + checkRepository(repo0, 0); + + BucketRegion fileBucket0 = fileBuckets.get(0); + + //Simulate rebalancing of a bucket by marking the old bucket is destroyed + //and creating a new bucket + Mockito.when(fileBucket0.isDestroyed()).thenReturn(true); + mockBucket0 = getMockBucket(0); + + IndexRepositoryImpl newRepo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); + assertNotEquals(repo0, newRepo0); + checkRepository(newRepo0, 0); + } + + /** * Test that we get the expected exception when a user bucket is missing */ @Test(expected = BucketNotFoundException.class) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java index 0b4a4cd..3a25c97 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java @@ -1,6 +1,6 @@ package com.gemstone.gemfire.cache.lucene.internal.repository; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.io.IOException; import java.io.Serializable; @@ -18,7 +18,9 @@ import org.apache.lucene.queryparser.classic.QueryParser; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory; import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey; import com.gemstone.gemfire.cache.lucene.internal.filesystem.File; @@ -38,6 +40,7 @@ public class IndexRepositoryImplJUnitTest { private HeterogenousLuceneSerializer mapper; private StandardAnalyzer analyzer = new StandardAnalyzer(); private IndexWriter writer; + private Region region; @Before public void setUp() throws IOException { @@ -48,7 +51,9 @@ public class IndexRepositoryImplJUnitTest { writer = new IndexWriter(dir, config); String[] indexedFields= new String[] {"s", "i", "l", "d", "f", "s2", "missing"}; mapper = new HeterogenousLuceneSerializer(indexedFields); - repo = new IndexRepositoryImpl(writer, mapper); + region = Mockito.mock(Region.class); + Mockito.when(region.isDestroyed()).thenReturn(false); + repo = new IndexRepositoryImpl(region, writer, mapper); } @Test
