GEODE-2402: Write to the lucene region buckets using a callback argument We were writing directly to the bucket regions of the file region and chunk region in the lucene code. With this change, we will instead write to the top level PR and pass a callback argument indicating the target bucket. The file and chunk regions now have a partition listener to route the put to the correct bucket.
The reason for all of this is that in some cases, the core code can can send a message that only includes the PR id and the key. We need want the core to be able to resolve the correct bucket from just those things. When we were putting directly into the PR, if the core code tried to compute the bucket id for a key it was getting the wrong bucket id, or in the case of a fixed PR, throwing an exception. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/372ddeea Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/372ddeea Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/372ddeea Branch: refs/heads/feature/GEODE-2402 Commit: 372ddeea950a22e0da762464e02b6a2031339683 Parents: a17c855 Author: Dan Smith <[email protected]> Authored: Fri Feb 10 09:45:09 2017 -0800 Committer: Dan Smith <[email protected]> Committed: Tue Feb 14 12:34:03 2017 -0800 ---------------------------------------------------------------------- .../geode/internal/cache/LocalDataSet.java | 4 +- .../geode/internal/cache/LocalRegion.java | 2 +- .../geode/internal/cache/PartitionedRegion.java | 11 ++ .../lucene/internal/IndexRepositoryFactory.java | 19 ++- .../LuceneIndexForPartitionedRegion.java | 13 +++ .../internal/directory/RegionDirectory.java | 26 ++--- .../lucene/internal/filesystem/FileSystem.java | 13 ++- .../partition/BucketTargetingFixedResolver.java | 79 +++++++++++++ .../internal/partition/BucketTargetingMap.java | 117 +++++++++++++++++++ .../partition/BucketTargetingResolver.java | 39 +++++++ .../geode/cache/lucene/LuceneDUnitTest.java | 2 +- .../geode/cache/lucene/LuceneQueriesPRBase.java | 44 +------ .../LuceneQueriesPeerFixedPRDUnitTest.java | 2 +- .../LuceneQueriesPeerPRRedundancyDUnitTest.java | 42 +++++++ .../PartitionedRepositoryManagerJUnitTest.java | 10 +- .../partition/BucketTargetingMapTest.java | 67 +++++++++++ 16 files changed, 418 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java index c4858dc..248d655 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java @@ -92,7 +92,7 @@ public class LocalDataSet implements Region, QueryExecutor { } public Set<Region.Entry> entries(boolean recursive) { - return this.proxy.new PREntriesSet(getBucketSet()); + return this.proxy.entries(getBucketSet()); } public Collection values() { @@ -101,7 +101,7 @@ public class LocalDataSet implements Region, QueryExecutor { } public Set keys() { - return this.proxy.new KeysSet(getBucketSet()); + return this.proxy.keySet(getBucketSet()); } public Set keySet() { http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 00401e9..5d5c7e2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -1020,7 +1020,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return newRegion; } - public final void create(Object key, Object value, Object aCallbackArgument) + public void create(Object key, Object value, Object aCallbackArgument) throws TimeoutException, EntryExistsException, CacheWriterException { long startPut = CachePerfStats.getStatTime(); @Released http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 44f8427..9374d4b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -6099,6 +6099,10 @@ public class PartitionedRegion extends LocalRegion return Collections.unmodifiableSet(new PREntriesSet()); } + public Set<Region.Entry> entries(Set<Integer> bucketIds) { + return new PREntriesSet(bucketIds); + } + /** * Set view of entries. This currently extends the keySet iterator and performs individual * getEntry() operations using the keys @@ -6164,6 +6168,13 @@ public class PartitionedRegion extends LocalRegion return Collections.unmodifiableSet(new KeysSet(allowTombstones)); } + /** + * Get a keyset of the given buckets + */ + public Set keySet(Set<Integer> bucketSet) { + return new KeysSet(bucketSet); + } + public Set keysWithoutCreatesForTests() { checkReadiness(); Set<Integer> availableBuckets = new HashSet<Integer>(); http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java index 57dd0a5..b5ab942 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java @@ -15,8 +15,10 @@ package org.apache.geode.cache.lucene.internal; import java.io.IOException; +import java.util.Map; import org.apache.geode.cache.lucene.internal.directory.RegionDirectory; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap; import org.apache.geode.cache.lucene.internal.repository.IndexRepository; import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl; import org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer; @@ -41,8 +43,11 @@ public class IndexRepositoryFactory { LuceneIndexImpl index, PartitionedRegion userRegion, final IndexRepository oldRepository) throws IOException { LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index; - BucketRegion fileBucket = getMatchingBucket(indexForPR.getFileRegion(), bucketId); - BucketRegion chunkBucket = getMatchingBucket(indexForPR.getChunkRegion(), bucketId); + final PartitionedRegion fileRegion = indexForPR.getFileRegion(); + final PartitionedRegion chunkRegion = indexForPR.getChunkRegion(); + + BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId); + BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId); BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId); boolean success = false; if (fileBucket == null || chunkBucket == null) { @@ -52,7 +57,7 @@ public class IndexRepositoryFactory { return null; } if (!fileBucket.getBucketAdvisor().isPrimary()) { - if (oldRepository != null) { + if (oldRepository != null && !oldRepository.isClosed()) { oldRepository.cleanup(); } return null; @@ -75,8 +80,8 @@ public class IndexRepositoryFactory { final IndexRepository repo; try { - RegionDirectory dir = - new RegionDirectory(fileBucket, chunkBucket, indexForPR.getFileSystemStats()); + RegionDirectory dir = new RegionDirectory(getBucketTargetingMap(fileRegion, bucketId), + getBucketTargetingMap(chunkRegion, bucketId), indexForPR.getFileSystemStats()); IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer()); IndexWriter writer = new IndexWriter(dir, config); repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexForPR.getIndexStats(), @@ -95,6 +100,10 @@ public class IndexRepositoryFactory { } + private Map getBucketTargetingMap(PartitionedRegion region, int bucketId) { + return new BucketTargetingMap(region, bucketId); + } + private String getLockName(final Integer bucketId, final BucketRegion fileBucket) { return FILE_REGION_LOCK_FOR_BUCKET_ID + fileBucket.getFullPath() + bucketId; } http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index 53b4e08..9a39b39 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -17,8 +17,10 @@ package org.apache.geode.cache.lucene.internal; import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.Cache; +import org.apache.geode.cache.FixedPartitionResolver; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionResolver; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionShortcut; @@ -28,6 +30,8 @@ import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles; import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey; import org.apache.geode.cache.lucene.internal.filesystem.File; import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer; import org.apache.geode.cache.partition.PartitionListener; @@ -143,9 +147,18 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { PartitionAttributes<?, ?> dataRegionAttributes) { attributesFactory.setTotalNumBuckets(dataRegionAttributes.getTotalNumBuckets()); attributesFactory.setRedundantCopies(dataRegionAttributes.getRedundantCopies()); + attributesFactory.setPartitionResolver(getPartitionResolver(dataRegionAttributes)); return attributesFactory; } + private PartitionResolver getPartitionResolver(PartitionAttributes dataRegionAttributes) { + if (dataRegionAttributes.getPartitionResolver() instanceof FixedPartitionResolver) { + return new BucketTargetingFixedResolver(); + } else { + return new BucketTargetingResolver(); + } + } + protected <K, V> Region<K, V> createRegion(final String regionName, final RegionShortcut regionShortCut, final String colocatedWithRegionName, final PartitionAttributes partitionAttributes, final RegionAttributes regionAttributes, http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java index 362cf93..18428ec 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java @@ -15,14 +15,10 @@ package org.apache.geode.cache.lucene.internal.directory; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; - +import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey; +import org.apache.geode.cache.lucene.internal.filesystem.File; +import org.apache.geode.cache.lucene.internal.filesystem.FileSystem; +import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -30,10 +26,12 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.store.SingleInstanceLockFactory; -import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey; -import org.apache.geode.cache.lucene.internal.filesystem.File; -import org.apache.geode.cache.lucene.internal.filesystem.FileSystem; -import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; /** * An implementation of Directory that stores data in geode regions. @@ -49,8 +47,8 @@ public class RegionDirectory extends BaseDirectory { * Create a region directory with a given file and chunk region. These regions may be bucket * regions or they may be replicated regions. */ - public RegionDirectory(ConcurrentMap<String, File> fileRegion, - ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) { + public RegionDirectory(Map<String, File> fileRegion, Map<ChunkKey, byte[]> chunkRegion, + FileSystemStats stats) { super(new SingleInstanceLockFactory()); fs = new FileSystem(fileRegion, chunkRegion, stats); } http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java index f3975bf..660816d 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ConcurrentMap; /** @@ -38,8 +39,8 @@ import java.util.concurrent.ConcurrentMap; public class FileSystem { private static final Logger logger = LogService.getLogger(); - private final ConcurrentMap<String, File> fileRegion; - private final ConcurrentMap<ChunkKey, byte[]> chunkRegion; + private final Map<String, File> fileRegion; + private final Map<ChunkKey, byte[]> chunkRegion; static final int CHUNK_SIZE = 1024 * 1024; // 1 MB private final FileSystemStats stats; @@ -54,8 +55,8 @@ public class FileSystem { * @param fileRegion the region to store metadata about the files * @param chunkRegion the region to store actual file data. */ - public FileSystem(ConcurrentMap<String, File> fileRegion, - ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) { + public FileSystem(Map<String, File> fileRegion, Map<ChunkKey, byte[]> chunkRegion, + FileSystemStats stats) { this.fileRegion = fileRegion; this.chunkRegion = chunkRegion; this.stats = stats; @@ -188,11 +189,11 @@ public class FileSystem { fileRegion.put(file.getName(), file); } - public ConcurrentMap<String, File> getFileRegion() { + public Map<String, File> getFileRegion() { return fileRegion; } - public ConcurrentMap<ChunkKey, byte[]> getChunkRegion() { + public Map<ChunkKey, byte[]> getChunkRegion() { return chunkRegion; } http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java new file mode 100644 index 0000000..66035c4 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import org.apache.geode.cache.EntryOperation; +import org.apache.geode.cache.FixedPartitionAttributes; +import org.apache.geode.cache.FixedPartitionResolver; +import org.apache.geode.cache.PartitionResolver; +import org.apache.geode.internal.cache.FixedPartitionAttributesImpl; +import org.apache.geode.internal.cache.PartitionedRegion; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +/** + * A partition resolver that expects the actual bucket id to be the callback argument of all + * operations. The partition resolver reverse engineers the fixed partition name and bucket number + * from the target partitioning. + * + * This is a bit messy, mostly because there's no good way to get the FixedPartition from the actual + * bucket id without iterating over all of the fixed partitions. + */ +public class BucketTargetingFixedResolver implements FixedPartitionResolver { + + @Override + public Object getRoutingObject(final EntryOperation opDetails) { + int targetBucketId = (Integer) opDetails.getCallbackArgument(); + final Map.Entry<String, Integer[]> targetPartition = getFixedPartition(opDetails); + + return targetBucketId - targetPartition.getValue()[0]; + } + + @Override + public String getName() { + return getClass().getName(); + } + + @Override + public void close() { + + } + + @Override + public String getPartitionName(final EntryOperation opDetails, + @Deprecated final Set targetPartitions) { + final Map.Entry<String, Integer[]> targetPartition = getFixedPartition(opDetails); + return targetPartition.getKey(); + } + + protected Map.Entry<String, Integer[]> getFixedPartition(final EntryOperation opDetails) { + PartitionedRegion region = (PartitionedRegion) opDetails.getRegion(); + int targetBucketId = (Integer) opDetails.getCallbackArgument(); + Map<String, Integer[]> partitions = region.getPartitionsMap(); + + return partitions.entrySet().stream().filter(entry -> withinPartition(entry, targetBucketId)) + .findFirst().get(); + } + + private boolean withinPartition(Map.Entry<String, Integer[]> entry, int bucketId) { + int startingBucket = entry.getValue()[0]; + int endingBucket = startingBucket + entry.getValue()[1]; + return startingBucket <= bucketId && bucketId < endingBucket; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java new file mode 100644 index 0000000..e2467ac --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import org.apache.geode.cache.EntryExistsException; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.LocalDataSet; +import org.apache.geode.internal.cache.PartitionedRegion; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Set; + +public class BucketTargetingMap<K, V> extends AbstractMap<K, V> { + + private final Region<K, V> region; + public Object callbackArg; + + public BucketTargetingMap(PartitionedRegion region, int bucketId) { + this.callbackArg = bucketId; + this.region = new LocalDataSet(region, Collections.singleton(bucketId)); + } + + @Override + public Set<K> keySet() { + return region.keySet(); + } + + @Override + public V putIfAbsent(final K key, final V value) { + try { + region.create(key, value, callbackArg); + } catch (EntryExistsException e) { + return (V) e.getOldValue(); + } + return null; + } + + @Override + public V get(final Object key) { + return region.get(key, callbackArg); + } + + @Override + public V remove(final Object key) { + try { + V oldValue = region.get(key, callbackArg); + region.destroy(key, callbackArg); + return oldValue; + } catch (EntryNotFoundException e) { + return null; + } + } + + @Override + public boolean containsKey(final Object key) { + return region.get(key, callbackArg) != null; + } + + @Override + public V put(final K key, final V value) { + return region.put(key, value, callbackArg); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return region.entrySet(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + final BucketTargetingMap<?, ?> that = (BucketTargetingMap<?, ?>) o; + + if (!region.getFullPath().equals(that.region.getFullPath())) { + return false; + } + return callbackArg.equals(that.callbackArg); + + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + region.hashCode(); + result = 31 * result + callbackArg.hashCode(); + return result; + } + + @Override + public String toString() { + return "BucketTargetingMap{" + "region=" + region.getFullPath() + ", callbackArg=" + callbackArg + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java new file mode 100644 index 0000000..a6b5c10 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import org.apache.geode.cache.EntryOperation; +import org.apache.geode.cache.PartitionResolver; + +/** + * A partition resolver that expects all operations to be performed with a callback argument that + * indicates the actual bucket to target. + */ +public class BucketTargetingResolver implements PartitionResolver { + @Override + public Object getRoutingObject(final EntryOperation opDetails) { + return opDetails.getCallbackArgument(); + } + + @Override + public String getName() { + return getClass().getName(); + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java index 78edb5c..9318b0e 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java @@ -27,7 +27,7 @@ public abstract class LuceneDUnitTest extends JUnit4CacheTestCase { public void postSetUp() throws Exception { Host host = Host.getHost(0); dataStore1 = host.getVM(0); - dataStore2 = host.getVM(-1); + dataStore2 = host.getVM(1); } protected abstract void initDataStore(SerializableRunnableIF createIndex) throws Exception; http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java index be2a754..25a4927 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java @@ -70,45 +70,6 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { } @Test - public void returnCorrectResultsWhenIndexUpdateHappensIntheMiddleofGII() - throws InterruptedException { - SerializableRunnableIF createIndex = () -> { - LuceneService luceneService = LuceneServiceProvider.get(getCache()); - luceneService.createIndex(INDEX_NAME, REGION_NAME, "text"); - }; - dataStore1.invoke(() -> initDataStore(createIndex)); - accessor.invoke(() -> initAccessor(createIndex)); - dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); - putEntryInEachBucket(); - - dataStore2.invoke(() -> { - InitialImageOperation.setGIITestHook( - new GIITestHook(GIITestHookType.AfterSentRequestImage, "Do puts during request") { - @Override - public void reset() { - - } - - @Override - public String getRegionName() { - return "_B__index#__region.files_0"; - } - - @Override - public void run() { - dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache())); - waitForFlushBeforeExecuteTextSearch(dataStore1, 30000); - } - }); - }); - - dataStore2.invoke(() -> initDataStore(createIndex)); - - assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 30000)); - executeTextSearch(accessor, "world", "text", NUM_BUCKETS); - } - - @Test public void returnCorrectResultsWhenMoveBucketHappensOnIndexUpdate() throws InterruptedException { final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember()); @@ -232,7 +193,10 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { } private void removeCallback(VM vm) { - vm.invoke(IndexRepositorySpy::remove); + vm.invoke(() -> { + IndexRepositorySpy.remove(); + InitialImageOperation.resetAllGIITestHooks(); + }); } private void rebalanceRegion(VM vm) { http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java index dd9fa6b..2622063 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java @@ -22,7 +22,7 @@ import org.apache.geode.test.junit.categories.DistributedTest; import org.junit.experimental.categories.Category; @Category(DistributedTest.class) -public class LuceneQueriesPeerFixedPRDUnitTest extends LuceneQueriesPRBase { +public class LuceneQueriesPeerFixedPRDUnitTest extends LuceneQueriesPeerPRRedundancyDUnitTest { @Override protected void initAccessor(SerializableRunnableIF createIndex) throws Exception { http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java index 496721a..1b4303a 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java @@ -28,6 +28,9 @@ import org.apache.geode.cache.lucene.test.IndexRepositorySpy; import org.apache.geode.cache.lucene.test.LuceneTestUtilities; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.InitialImageOperation; +import org.apache.geode.internal.cache.InitialImageOperation.GIITestHook; +import org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage; import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse; @@ -101,6 +104,45 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends LuceneQueriesPRBase () -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(basicGetCache()::isClosed)); } + @Test + public void returnCorrectResultsWhenIndexUpdateHappensIntheMiddleofGII() + throws InterruptedException { + SerializableRunnableIF createIndex = () -> { + LuceneService luceneService = LuceneServiceProvider.get(getCache()); + luceneService.createIndex(INDEX_NAME, REGION_NAME, "text"); + }; + dataStore1.invoke(() -> initDataStore(createIndex)); + accessor.invoke(() -> initAccessor(createIndex)); + dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); + putEntryInEachBucket(); + + dataStore2.invoke(() -> { + InitialImageOperation.setGIITestHook( + new GIITestHook(GIITestHookType.AfterSentRequestImage, "Do puts during request") { + @Override + public void reset() { + + } + + @Override + public String getRegionName() { + return "_B__index#__region.files_0"; + } + + @Override + public void run() { + dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache())); + waitForFlushBeforeExecuteTextSearch(dataStore1, 30000); + } + }); + }); + + dataStore2.invoke(() -> initDataStore(createIndex)); + + assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 30000)); + executeTextSearch(accessor, "world", "text", NUM_BUCKETS); + } + private void putEntriesAndValidateResultsWithRedundancy() { SerializableRunnableIF createIndex = () -> { LuceneService luceneService = LuceneServiceProvider.get(getCache()); http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java index 7890903..bb8e0b4 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -28,6 +28,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.locks.DLockService; import org.apache.geode.internal.cache.BucketAdvisor; @@ -102,8 +103,11 @@ public class PartitionedRepositoryManagerJUnitTest { fileRegion = Mockito.mock(PartitionedRegion.class); fileDataStore = Mockito.mock(PartitionedRegionDataStore.class); when(fileRegion.getDataStore()).thenReturn(fileDataStore); + when(fileRegion.getTotalNumberOfBuckets()).thenReturn(113); + when(fileRegion.getFullPath()).thenReturn("FileRegion"); chunkRegion = Mockito.mock(PartitionedRegion.class); chunkDataStore = Mockito.mock(PartitionedRegionDataStore.class); + when(chunkRegion.getFullPath()).thenReturn("ChunkRegion"); when(chunkRegion.getDataStore()).thenReturn(chunkDataStore); indexStats = Mockito.mock(LuceneIndexStats.class); fileSystemStats = Mockito.mock(FileSystemStats.class); @@ -237,8 +241,10 @@ public class PartitionedRepositoryManagerJUnitTest { protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) { IndexWriter writer0 = repo0.getWriter(); RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory(); - assertEquals(fileBuckets.get(bucketId), dir0.getFileSystem().getFileRegion()); - assertEquals(chunkBuckets.get(bucketId), dir0.getFileSystem().getChunkRegion()); + assertEquals(new BucketTargetingMap(fileRegion, bucketId), + dir0.getFileSystem().getFileRegion()); + assertEquals(new BucketTargetingMap(chunkRegion, bucketId), + dir0.getFileSystem().getChunkRegion()); assertEquals(serializer, repo0.getSerializer()); } http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java new file mode 100644 index 0000000..468fd04 --- /dev/null +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal.partition; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.test.junit.categories.IntegrationTest; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mock; + +@Category(UnitTest.class) +public class BucketTargetingMapTest { + + private PartitionedRegion region; + + @Before + public void initMocks() { + region = mock(PartitionedRegion.class); + final BucketTargetingResolver resolver = new BucketTargetingResolver(); + when(region.getPartitionResolver()).thenReturn(resolver); + when(region.getTotalNumberOfBuckets()).thenReturn(5); + } + + @Test + public void getUsesCallbackArg() { + final BucketTargetingMap map = new BucketTargetingMap(region, 1); + when(region.get(eq("key"), eq(1))).thenReturn("value"); + assertEquals("value", map.get("key")); + } + + @Test + public void putIfAbsentUsesCallbackArg() { + final BucketTargetingMap map = new BucketTargetingMap(region, 1); + map.putIfAbsent("key", "value"); + verify(region).create(eq("key"), eq("value"), eq(1)); + } + + @Test + public void containsKeyUsesCallbackArg() { + final BucketTargetingMap map = new BucketTargetingMap(region, 1); + when(region.get(eq("key"), eq(1))).thenReturn("value"); + assertEquals(true, map.containsKey("key")); + assertEquals(false, map.containsKey("none")); + verify(region).get(eq("none"), eq(1)); + } +}
