GEODE-2402: Write to the lucene region buckets using a callback argument Adding a callback argument when writing to the file and chunk regions. The file and chunk regions now have a partition listener to route the put to the correct bucket.
The reason for all of this is that in some cases, the core code can can send a message that only includes the PR id and the key. We need want the core to be able to resolve the correct bucket from just those things, which requires having the PartitionListener that uses the callback argument. Added a test of putting to the file and chunk regions during GII, which is the case where the core code sends a message that includes only the PR id and the key. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/8ce8e43e Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/8ce8e43e Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/8ce8e43e Branch: refs/heads/feature/GEODE-2402 Commit: 8ce8e43e0c309cb4727bc742f348954bcc3d4f48 Parents: a8c6543 Author: Dan Smith <[email protected]> Authored: Wed Feb 8 16:38:13 2017 -0800 Committer: Dan Smith <[email protected]> Committed: Thu Feb 16 16:41:37 2017 -0800 ---------------------------------------------------------------------- .../internal/cache/InitialImageOperation.java | 2 +- .../geode/internal/cache/LocalDataSet.java | 4 +- .../geode/internal/cache/LocalRegion.java | 2 +- .../geode/internal/cache/PartitionedRegion.java | 11 ++ geode-lucene/build.gradle | 2 + .../lucene/internal/IndexRepositoryFactory.java | 23 ++-- .../LuceneIndexForPartitionedRegion.java | 13 ++ .../internal/directory/RegionDirectory.java | 26 ++-- .../lucene/internal/filesystem/FileSystem.java | 13 +- .../partition/BucketTargetingFixedResolver.java | 79 +++++++++++++ .../internal/partition/BucketTargetingMap.java | 118 +++++++++++++++++++ .../partition/BucketTargetingResolver.java | 39 ++++++ .../repository/IndexRepositoryImpl.java | 2 +- .../geode/cache/lucene/LuceneQueriesPRBase.java | 12 +- .../LuceneQueriesPeerFixedPRDUnitTest.java | 2 +- .../LuceneQueriesPeerPRRedundancyDUnitTest.java | 42 +++++++ .../PartitionedRepositoryManagerJUnitTest.java | 13 +- .../BucketTargetingFixedResolverTest.java | 66 +++++++++++ .../partition/BucketTargetingMapTest.java | 66 +++++++++++ 19 files changed, 499 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java index 3ec8152..d0ad5db 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java @@ -3947,7 +3947,7 @@ public class InitialImageOperation { } } - public static abstract class GIITestHook implements Runnable { + public static abstract class GIITestHook implements Runnable, Serializable { final private GIITestHookType type; final private String region_name; volatile public boolean isRunning; http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java index c4858dc..248d655 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java @@ -92,7 +92,7 @@ public class LocalDataSet implements Region, QueryExecutor { } public Set<Region.Entry> entries(boolean recursive) { - return this.proxy.new PREntriesSet(getBucketSet()); + return this.proxy.entries(getBucketSet()); } public Collection values() { @@ -101,7 +101,7 @@ public class LocalDataSet implements Region, QueryExecutor { } public Set keys() { - return this.proxy.new KeysSet(getBucketSet()); + return this.proxy.keySet(getBucketSet()); } public Set keySet() { http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 00401e9..5d5c7e2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -1020,7 +1020,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return newRegion; } - public final void create(Object key, Object value, Object aCallbackArgument) + public void create(Object key, Object value, Object aCallbackArgument) throws TimeoutException, EntryExistsException, CacheWriterException { long startPut = CachePerfStats.getStatTime(); @Released http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 44f8427..9374d4b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -6099,6 +6099,10 @@ public class PartitionedRegion extends LocalRegion return Collections.unmodifiableSet(new PREntriesSet()); } + public Set<Region.Entry> entries(Set<Integer> bucketIds) { + return new PREntriesSet(bucketIds); + } + /** * Set view of entries. This currently extends the keySet iterator and performs individual * getEntry() operations using the keys @@ -6164,6 +6168,13 @@ public class PartitionedRegion extends LocalRegion return Collections.unmodifiableSet(new KeysSet(allowTombstones)); } + /** + * Get a keyset of the given buckets + */ + public Set keySet(Set<Integer> bucketSet) { + return new KeysSet(bucketSet); + } + public Set keysWithoutCreatesForTests() { checkReadiness(); Set<Integer> availableBuckets = new HashSet<Integer>(); http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/build.gradle ---------------------------------------------------------------------- diff --git a/geode-lucene/build.gradle b/geode-lucene/build.gradle index b7c449b..8545200 100644 --- a/geode-lucene/build.gradle +++ b/geode-lucene/build.gradle @@ -31,6 +31,8 @@ dependencies { //Lucene test framework. testCompile 'org.apache.lucene:lucene-test-framework:' + project.'lucene.version' testCompile 'org.apache.lucene:lucene-codecs:' + project.'lucene.version' + testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version' + testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version' testCompile files(project(':geode-core').sourceSets.test.output) } http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java index 57dd0a5..7e685b7 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java @@ -15,8 +15,10 @@ package org.apache.geode.cache.lucene.internal; import java.io.IOException; +import java.util.Map; import org.apache.geode.cache.lucene.internal.directory.RegionDirectory; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap; import org.apache.geode.cache.lucene.internal.repository.IndexRepository; import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl; import org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer; @@ -41,8 +43,11 @@ public class IndexRepositoryFactory { LuceneIndexImpl index, PartitionedRegion userRegion, final IndexRepository oldRepository) throws IOException { LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index; - BucketRegion fileBucket = getMatchingBucket(indexForPR.getFileRegion(), bucketId); - BucketRegion chunkBucket = getMatchingBucket(indexForPR.getChunkRegion(), bucketId); + final PartitionedRegion fileRegion = indexForPR.getFileRegion(); + final PartitionedRegion chunkRegion = indexForPR.getChunkRegion(); + + BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId); + BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId); BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId); boolean success = false; if (fileBucket == null || chunkBucket == null) { @@ -51,7 +56,7 @@ public class IndexRepositoryFactory { } return null; } - if (!fileBucket.getBucketAdvisor().isPrimary()) { + if (!chunkBucket.getBucketAdvisor().isPrimary()) { if (oldRepository != null) { oldRepository.cleanup(); } @@ -68,15 +73,15 @@ public class IndexRepositoryFactory { DistributedLockService lockService = getLockService(); String lockName = getLockName(bucketId, fileBucket); while (!lockService.lock(lockName, 100, -1)) { - if (!fileBucket.getBucketAdvisor().isPrimary()) { + if (!chunkBucket.getBucketAdvisor().isPrimary()) { return null; } } final IndexRepository repo; try { - RegionDirectory dir = - new RegionDirectory(fileBucket, chunkBucket, indexForPR.getFileSystemStats()); + RegionDirectory dir = new RegionDirectory(getBucketTargetingMap(fileBucket, bucketId), + getBucketTargetingMap(chunkBucket, bucketId), indexForPR.getFileSystemStats()); IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer()); IndexWriter writer = new IndexWriter(dir, config); repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexForPR.getIndexStats(), @@ -95,8 +100,12 @@ public class IndexRepositoryFactory { } + private Map getBucketTargetingMap(BucketRegion region, int bucketId) { + return new BucketTargetingMap(region, bucketId); + } + private String getLockName(final Integer bucketId, final BucketRegion fileBucket) { - return FILE_REGION_LOCK_FOR_BUCKET_ID + fileBucket.getFullPath() + bucketId; + return FILE_REGION_LOCK_FOR_BUCKET_ID + fileBucket.getFullPath(); } private DistributedLockService getLockService() { http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index 53b4e08..9a39b39 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -17,8 +17,10 @@ package org.apache.geode.cache.lucene.internal; import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.Cache; +import org.apache.geode.cache.FixedPartitionResolver; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionResolver; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionShortcut; @@ -28,6 +30,8 @@ import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles; import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey; import org.apache.geode.cache.lucene.internal.filesystem.File; import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer; import org.apache.geode.cache.partition.PartitionListener; @@ -143,9 +147,18 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { PartitionAttributes<?, ?> dataRegionAttributes) { attributesFactory.setTotalNumBuckets(dataRegionAttributes.getTotalNumBuckets()); attributesFactory.setRedundantCopies(dataRegionAttributes.getRedundantCopies()); + attributesFactory.setPartitionResolver(getPartitionResolver(dataRegionAttributes)); return attributesFactory; } + private PartitionResolver getPartitionResolver(PartitionAttributes dataRegionAttributes) { + if (dataRegionAttributes.getPartitionResolver() instanceof FixedPartitionResolver) { + return new BucketTargetingFixedResolver(); + } else { + return new BucketTargetingResolver(); + } + } + protected <K, V> Region<K, V> createRegion(final String regionName, final RegionShortcut regionShortCut, final String colocatedWithRegionName, final PartitionAttributes partitionAttributes, final RegionAttributes regionAttributes, http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java index 362cf93..18428ec 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java @@ -15,14 +15,10 @@ package org.apache.geode.cache.lucene.internal.directory; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; - +import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey; +import org.apache.geode.cache.lucene.internal.filesystem.File; +import org.apache.geode.cache.lucene.internal.filesystem.FileSystem; +import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -30,10 +26,12 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.store.SingleInstanceLockFactory; -import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey; -import org.apache.geode.cache.lucene.internal.filesystem.File; -import org.apache.geode.cache.lucene.internal.filesystem.FileSystem; -import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; /** * An implementation of Directory that stores data in geode regions. @@ -49,8 +47,8 @@ public class RegionDirectory extends BaseDirectory { * Create a region directory with a given file and chunk region. These regions may be bucket * regions or they may be replicated regions. */ - public RegionDirectory(ConcurrentMap<String, File> fileRegion, - ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) { + public RegionDirectory(Map<String, File> fileRegion, Map<ChunkKey, byte[]> chunkRegion, + FileSystemStats stats) { super(new SingleInstanceLockFactory()); fs = new FileSystem(fileRegion, chunkRegion, stats); } http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java index f3975bf..660816d 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ConcurrentMap; /** @@ -38,8 +39,8 @@ import java.util.concurrent.ConcurrentMap; public class FileSystem { private static final Logger logger = LogService.getLogger(); - private final ConcurrentMap<String, File> fileRegion; - private final ConcurrentMap<ChunkKey, byte[]> chunkRegion; + private final Map<String, File> fileRegion; + private final Map<ChunkKey, byte[]> chunkRegion; static final int CHUNK_SIZE = 1024 * 1024; // 1 MB private final FileSystemStats stats; @@ -54,8 +55,8 @@ public class FileSystem { * @param fileRegion the region to store metadata about the files * @param chunkRegion the region to store actual file data. */ - public FileSystem(ConcurrentMap<String, File> fileRegion, - ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) { + public FileSystem(Map<String, File> fileRegion, Map<ChunkKey, byte[]> chunkRegion, + FileSystemStats stats) { this.fileRegion = fileRegion; this.chunkRegion = chunkRegion; this.stats = stats; @@ -188,11 +189,11 @@ public class FileSystem { fileRegion.put(file.getName(), file); } - public ConcurrentMap<String, File> getFileRegion() { + public Map<String, File> getFileRegion() { return fileRegion; } - public ConcurrentMap<ChunkKey, byte[]> getChunkRegion() { + public Map<ChunkKey, byte[]> getChunkRegion() { return chunkRegion; } http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java new file mode 100644 index 0000000..66035c4 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import org.apache.geode.cache.EntryOperation; +import org.apache.geode.cache.FixedPartitionAttributes; +import org.apache.geode.cache.FixedPartitionResolver; +import org.apache.geode.cache.PartitionResolver; +import org.apache.geode.internal.cache.FixedPartitionAttributesImpl; +import org.apache.geode.internal.cache.PartitionedRegion; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +/** + * A partition resolver that expects the actual bucket id to be the callback argument of all + * operations. The partition resolver reverse engineers the fixed partition name and bucket number + * from the target partitioning. + * + * This is a bit messy, mostly because there's no good way to get the FixedPartition from the actual + * bucket id without iterating over all of the fixed partitions. + */ +public class BucketTargetingFixedResolver implements FixedPartitionResolver { + + @Override + public Object getRoutingObject(final EntryOperation opDetails) { + int targetBucketId = (Integer) opDetails.getCallbackArgument(); + final Map.Entry<String, Integer[]> targetPartition = getFixedPartition(opDetails); + + return targetBucketId - targetPartition.getValue()[0]; + } + + @Override + public String getName() { + return getClass().getName(); + } + + @Override + public void close() { + + } + + @Override + public String getPartitionName(final EntryOperation opDetails, + @Deprecated final Set targetPartitions) { + final Map.Entry<String, Integer[]> targetPartition = getFixedPartition(opDetails); + return targetPartition.getKey(); + } + + protected Map.Entry<String, Integer[]> getFixedPartition(final EntryOperation opDetails) { + PartitionedRegion region = (PartitionedRegion) opDetails.getRegion(); + int targetBucketId = (Integer) opDetails.getCallbackArgument(); + Map<String, Integer[]> partitions = region.getPartitionsMap(); + + return partitions.entrySet().stream().filter(entry -> withinPartition(entry, targetBucketId)) + .findFirst().get(); + } + + private boolean withinPartition(Map.Entry<String, Integer[]> entry, int bucketId) { + int startingBucket = entry.getValue()[0]; + int endingBucket = startingBucket + entry.getValue()[1]; + return startingBucket <= bucketId && bucketId < endingBucket; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java new file mode 100644 index 0000000..19fb4dc --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import org.apache.geode.cache.EntryExistsException; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.LocalDataSet; +import org.apache.geode.internal.cache.PartitionedRegion; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Set; + +public class BucketTargetingMap<K, V> extends AbstractMap<K, V> { + + private final Region<K, V> region; + public Object callbackArg; + + public BucketTargetingMap(BucketRegion region, int bucketId) { + this.callbackArg = bucketId; + this.region = region; + } + + @Override + public Set<K> keySet() { + return region.keySet(); + } + + @Override + public V putIfAbsent(final K key, final V value) { + try { + region.create(key, value, callbackArg); + } catch (EntryExistsException e) { + return (V) e.getOldValue(); + } + return null; + } + + @Override + public V get(final Object key) { + return region.get(key, callbackArg); + } + + @Override + public V remove(final Object key) { + try { + V oldValue = region.get(key, callbackArg); + region.destroy(key, callbackArg); + return oldValue; + } catch (EntryNotFoundException e) { + return null; + } + } + + @Override + public boolean containsKey(final Object key) { + return region.get(key, callbackArg) != null; + } + + @Override + public V put(final K key, final V value) { + return region.put(key, value, callbackArg); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return region.entrySet(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + final BucketTargetingMap<?, ?> that = (BucketTargetingMap<?, ?>) o; + + if (!region.getFullPath().equals(that.region.getFullPath())) { + return false; + } + return callbackArg.equals(that.callbackArg); + + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + region.hashCode(); + result = 31 * result + callbackArg.hashCode(); + return result; + } + + @Override + public String toString() { + return "BucketTargetingMap{" + "region=" + region.getFullPath() + ", callbackArg=" + callbackArg + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java new file mode 100644 index 0000000..a6b5c10 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import org.apache.geode.cache.EntryOperation; +import org.apache.geode.cache.PartitionResolver; + +/** + * A partition resolver that expects all operations to be performed with a callback argument that + * indicates the actual bucket to target. + */ +public class BucketTargetingResolver implements PartitionResolver { + @Override + public Object getRoutingObject(final EntryOperation opDetails) { + return opDetails.getCallbackArgument(); + } + + @Override + public String getName() { + return getClass().getName(); + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java index f1ee987..8a68b07 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java @@ -51,7 +51,7 @@ public class IndexRepositoryImpl implements IndexRepository { private Region<?, ?> userRegion; private LuceneIndexStats stats; private DocumentCountSupplier documentCountSupplier; - private DistributedLockService lockService; + private final DistributedLockService lockService; private String lockName; private static final Logger logger = LogService.getLogger(); http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java index 931edc5..25a4927 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java @@ -23,6 +23,13 @@ import static org.mockito.Matchers.any; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.internal.cache.InitialImageOperation; +import org.apache.geode.internal.cache.InitialImageOperation.GIITestHook; +import org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType; +import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage; import org.junit.After; import org.junit.Test; @@ -186,7 +193,10 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { } private void removeCallback(VM vm) { - vm.invoke(IndexRepositorySpy::remove); + vm.invoke(() -> { + IndexRepositorySpy.remove(); + InitialImageOperation.resetAllGIITestHooks(); + }); } private void rebalanceRegion(VM vm) { http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java index dd9fa6b..2622063 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java @@ -22,7 +22,7 @@ import org.apache.geode.test.junit.categories.DistributedTest; import org.junit.experimental.categories.Category; @Category(DistributedTest.class) -public class LuceneQueriesPeerFixedPRDUnitTest extends LuceneQueriesPRBase { +public class LuceneQueriesPeerFixedPRDUnitTest extends LuceneQueriesPeerPRRedundancyDUnitTest { @Override protected void initAccessor(SerializableRunnableIF createIndex) throws Exception { http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java index 496721a..1b4303a 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java @@ -28,6 +28,9 @@ import org.apache.geode.cache.lucene.test.IndexRepositorySpy; import org.apache.geode.cache.lucene.test.LuceneTestUtilities; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.InitialImageOperation; +import org.apache.geode.internal.cache.InitialImageOperation.GIITestHook; +import org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage; import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse; @@ -101,6 +104,45 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends LuceneQueriesPRBase () -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(basicGetCache()::isClosed)); } + @Test + public void returnCorrectResultsWhenIndexUpdateHappensIntheMiddleofGII() + throws InterruptedException { + SerializableRunnableIF createIndex = () -> { + LuceneService luceneService = LuceneServiceProvider.get(getCache()); + luceneService.createIndex(INDEX_NAME, REGION_NAME, "text"); + }; + dataStore1.invoke(() -> initDataStore(createIndex)); + accessor.invoke(() -> initAccessor(createIndex)); + dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); + putEntryInEachBucket(); + + dataStore2.invoke(() -> { + InitialImageOperation.setGIITestHook( + new GIITestHook(GIITestHookType.AfterSentRequestImage, "Do puts during request") { + @Override + public void reset() { + + } + + @Override + public String getRegionName() { + return "_B__index#__region.files_0"; + } + + @Override + public void run() { + dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache())); + waitForFlushBeforeExecuteTextSearch(dataStore1, 30000); + } + }); + }); + + dataStore2.invoke(() -> initDataStore(createIndex)); + + assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 30000)); + executeTextSearch(accessor, "world", "text", NUM_BUCKETS); + } + private void putEntriesAndValidateResultsWithRedundancy() { SerializableRunnableIF createIndex = () -> { LuceneService luceneService = LuceneServiceProvider.get(getCache()); http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java index 7890903..9c603c7 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -28,6 +28,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.locks.DLockService; import org.apache.geode.internal.cache.BucketAdvisor; @@ -102,8 +103,11 @@ public class PartitionedRepositoryManagerJUnitTest { fileRegion = Mockito.mock(PartitionedRegion.class); fileDataStore = Mockito.mock(PartitionedRegionDataStore.class); when(fileRegion.getDataStore()).thenReturn(fileDataStore); + when(fileRegion.getTotalNumberOfBuckets()).thenReturn(113); + when(fileRegion.getFullPath()).thenReturn("FileRegion"); chunkRegion = Mockito.mock(PartitionedRegion.class); chunkDataStore = Mockito.mock(PartitionedRegionDataStore.class); + when(chunkRegion.getFullPath()).thenReturn("ChunkRegion"); when(chunkRegion.getDataStore()).thenReturn(chunkDataStore); indexStats = Mockito.mock(LuceneIndexStats.class); fileSystemStats = Mockito.mock(FileSystemStats.class); @@ -237,8 +241,10 @@ public class PartitionedRepositoryManagerJUnitTest { protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) { IndexWriter writer0 = repo0.getWriter(); RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory(); - assertEquals(fileBuckets.get(bucketId), dir0.getFileSystem().getFileRegion()); - assertEquals(chunkBuckets.get(bucketId), dir0.getFileSystem().getChunkRegion()); + assertEquals(new BucketTargetingMap(fileBuckets.get(bucketId), bucketId), + dir0.getFileSystem().getFileRegion()); + assertEquals(new BucketTargetingMap(chunkBuckets.get(bucketId), bucketId), + dir0.getFileSystem().getChunkRegion()); assertEquals(serializer, repo0.getSerializer()); } @@ -247,7 +253,9 @@ public class PartitionedRepositoryManagerJUnitTest { BucketRegion fileBucket = Mockito.mock(BucketRegion.class); // Allowing the fileBucket to behave like a map so that the IndexWriter operations don't fail Fakes.addMapBehavior(fileBucket); + when(fileBucket.getFullPath()).thenReturn("File" + id); BucketRegion chunkBucket = Mockito.mock(BucketRegion.class); + when(chunkBucket.getFullPath()).thenReturn("Chunk" + id); when(mockBucket.getId()).thenReturn(id); when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket); when(userDataStore.getLocalBucketById(eq(id))).thenReturn(mockBucket); @@ -262,6 +270,7 @@ public class PartitionedRepositoryManagerJUnitTest { BucketAdvisor mockBucketAdvisor = Mockito.mock(BucketAdvisor.class); when(fileBucket.getBucketAdvisor()).thenReturn(mockBucketAdvisor); + when(chunkBucket.getBucketAdvisor()).thenReturn(mockBucketAdvisor); when(mockBucketAdvisor.isPrimary()).thenReturn(true); return mockBucket; } http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolverTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolverTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolverTest.java new file mode 100644 index 0000000..ed01d2a --- /dev/null +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolverTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import static org.junit.Assert.*; +import static org.junit.Assume.assumeThat; +import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.pholser.junit.quickcheck.Property; +import com.pholser.junit.quickcheck.generator.InRange; +import com.pholser.junit.quickcheck.generator.Size; +import com.pholser.junit.quickcheck.runner.JUnitQuickcheck; +import org.apache.geode.cache.EntryOperation; +import org.apache.geode.cache.Operation; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Category(UnitTest.class) +@RunWith(JUnitQuickcheck.class) +public class BucketTargetingFixedResolverTest { + + @Property + public void shouldReturnCorrectPartitionForGetHashKey( + @Size(min = 1, max = 5) List<@InRange(minInt = 1, maxInt = 20) Integer> partitionSizes, + @InRange(minInt = 0, maxInt = 50) int bucketId) { + BucketTargetingFixedResolver resolver = new BucketTargetingFixedResolver(); + + ConcurrentMap<String, Integer[]> fakePartitions = new ConcurrentHashMap<>(); + int startingBucket = 0; + for (int i = 0; i < partitionSizes.size(); i++) { + fakePartitions.put("p" + i, new Integer[] {startingBucket, partitionSizes.get(i)}); + startingBucket += partitionSizes.get(i); + } + assumeTrue(bucketId < startingBucket); + + final PartitionedRegion region = mock(PartitionedRegion.class); + when(region.getPartitionsMap()).thenReturn(fakePartitions); + when(region.isFixedPartitionedRegion()).thenReturn(true); + when(region.getPartitionResolver()).thenReturn(resolver); + assertEquals(bucketId, + PartitionedRegionHelper.getHashKey(region, Operation.CREATE, "key", "value", bucketId)); + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/8ce8e43e/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java new file mode 100644 index 0000000..8850f45 --- /dev/null +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.test.junit.categories.IntegrationTest; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mock; + +@Category(UnitTest.class) +public class BucketTargetingMapTest { + + private BucketRegion region; + + @Before + public void initMocks() { + region = mock(BucketRegion.class); + final BucketTargetingResolver resolver = new BucketTargetingResolver(); + } + + @Test + public void getUsesCallbackArg() { + final BucketTargetingMap map = new BucketTargetingMap(region, 1); + when(region.get(eq("key"), eq(1))).thenReturn("value"); + assertEquals("value", map.get("key")); + } + + @Test + public void putIfAbsentUsesCallbackArg() { + final BucketTargetingMap map = new BucketTargetingMap(region, 1); + map.putIfAbsent("key", "value"); + verify(region).create(eq("key"), eq("value"), eq(1)); + } + + @Test + public void containsKeyUsesCallbackArg() { + final BucketTargetingMap map = new BucketTargetingMap(region, 1); + when(region.get(eq("key"), eq(1))).thenReturn("value"); + assertEquals(true, map.containsKey("key")); + assertEquals(false, map.containsKey("none")); + verify(region).get(eq("none"), eq(1)); + } +}
