GEODE-2402: Write to the lucene region buckets using a callback argument

We were writing directly to the bucket regions of the file region and
chunk region in the lucene code. With this change, we will instead write
to the top level PR and pass a callback argument indicating the target
bucket. The file and chunk regions now have a partition listener to
route the put to the correct bucket.

The reason for all of this is that in some cases, the core code can can
send a message that only includes the PR id and the key. We need want
the core to be able to resolve the correct bucket from just those
things. When we were putting directly into the PR, if the core code
tried to compute the bucket id for a key it was getting the wrong
bucket id, or in the case of a fixed PR, throwing an exception.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/372ddeea
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/372ddeea
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/372ddeea

Branch: refs/heads/feature/GEODE-2402
Commit: 372ddeea950a22e0da762464e02b6a2031339683
Parents: a17c855
Author: Dan Smith <[email protected]>
Authored: Fri Feb 10 09:45:09 2017 -0800
Committer: Dan Smith <[email protected]>
Committed: Tue Feb 14 12:34:03 2017 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/LocalDataSet.java      |   4 +-
 .../geode/internal/cache/LocalRegion.java       |   2 +-
 .../geode/internal/cache/PartitionedRegion.java |  11 ++
 .../lucene/internal/IndexRepositoryFactory.java |  19 ++-
 .../LuceneIndexForPartitionedRegion.java        |  13 +++
 .../internal/directory/RegionDirectory.java     |  26 ++---
 .../lucene/internal/filesystem/FileSystem.java  |  13 ++-
 .../partition/BucketTargetingFixedResolver.java |  79 +++++++++++++
 .../internal/partition/BucketTargetingMap.java  | 117 +++++++++++++++++++
 .../partition/BucketTargetingResolver.java      |  39 +++++++
 .../geode/cache/lucene/LuceneDUnitTest.java     |   2 +-
 .../geode/cache/lucene/LuceneQueriesPRBase.java |  44 +------
 .../LuceneQueriesPeerFixedPRDUnitTest.java      |   2 +-
 .../LuceneQueriesPeerPRRedundancyDUnitTest.java |  42 +++++++
 .../PartitionedRepositoryManagerJUnitTest.java  |  10 +-
 .../partition/BucketTargetingMapTest.java       |  67 +++++++++++
 16 files changed, 418 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java
index c4858dc..248d655 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java
@@ -92,7 +92,7 @@ public class LocalDataSet implements Region, QueryExecutor {
   }
 
   public Set<Region.Entry> entries(boolean recursive) {
-    return this.proxy.new PREntriesSet(getBucketSet());
+    return this.proxy.entries(getBucketSet());
   }
 
   public Collection values() {
@@ -101,7 +101,7 @@ public class LocalDataSet implements Region, QueryExecutor {
   }
 
   public Set keys() {
-    return this.proxy.new KeysSet(getBucketSet());
+    return this.proxy.keySet(getBucketSet());
   }
 
   public Set keySet() {

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 00401e9..5d5c7e2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -1020,7 +1020,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     return newRegion;
   }
 
-  public final void create(Object key, Object value, Object aCallbackArgument)
+  public void create(Object key, Object value, Object aCallbackArgument)
       throws TimeoutException, EntryExistsException, CacheWriterException {
     long startPut = CachePerfStats.getStatTime();
     @Released

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 44f8427..9374d4b 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -6099,6 +6099,10 @@ public class PartitionedRegion extends LocalRegion
     return Collections.unmodifiableSet(new PREntriesSet());
   }
 
+  public Set<Region.Entry> entries(Set<Integer> bucketIds) {
+    return new PREntriesSet(bucketIds);
+  }
+
   /**
    * Set view of entries. This currently extends the keySet iterator and 
performs individual
    * getEntry() operations using the keys
@@ -6164,6 +6168,13 @@ public class PartitionedRegion extends LocalRegion
     return Collections.unmodifiableSet(new KeysSet(allowTombstones));
   }
 
+  /**
+   * Get a keyset of the given buckets
+   */
+  public Set keySet(Set<Integer> bucketSet) {
+    return new KeysSet(bucketSet);
+  }
+
   public Set keysWithoutCreatesForTests() {
     checkReadiness();
     Set<Integer> availableBuckets = new HashSet<Integer>();

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index 57dd0a5..b5ab942 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -15,8 +15,10 @@
 package org.apache.geode.cache.lucene.internal;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.geode.cache.lucene.internal.directory.RegionDirectory;
+import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl;
 import 
org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer;
@@ -41,8 +43,11 @@ public class IndexRepositoryFactory {
       LuceneIndexImpl index, PartitionedRegion userRegion, final 
IndexRepository oldRepository)
       throws IOException {
     LuceneIndexForPartitionedRegion indexForPR = 
(LuceneIndexForPartitionedRegion) index;
-    BucketRegion fileBucket = getMatchingBucket(indexForPR.getFileRegion(), 
bucketId);
-    BucketRegion chunkBucket = getMatchingBucket(indexForPR.getChunkRegion(), 
bucketId);
+    final PartitionedRegion fileRegion = indexForPR.getFileRegion();
+    final PartitionedRegion chunkRegion = indexForPR.getChunkRegion();
+
+    BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId);
+    BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId);
     BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
     boolean success = false;
     if (fileBucket == null || chunkBucket == null) {
@@ -52,7 +57,7 @@ public class IndexRepositoryFactory {
       return null;
     }
     if (!fileBucket.getBucketAdvisor().isPrimary()) {
-      if (oldRepository != null) {
+      if (oldRepository != null && !oldRepository.isClosed()) {
         oldRepository.cleanup();
       }
       return null;
@@ -75,8 +80,8 @@ public class IndexRepositoryFactory {
 
     final IndexRepository repo;
     try {
-      RegionDirectory dir =
-          new RegionDirectory(fileBucket, chunkBucket, 
indexForPR.getFileSystemStats());
+      RegionDirectory dir = new 
RegionDirectory(getBucketTargetingMap(fileRegion, bucketId),
+          getBucketTargetingMap(chunkRegion, bucketId), 
indexForPR.getFileSystemStats());
       IndexWriterConfig config = new 
IndexWriterConfig(indexForPR.getAnalyzer());
       IndexWriter writer = new IndexWriter(dir, config);
       repo = new IndexRepositoryImpl(fileBucket, writer, serializer, 
indexForPR.getIndexStats(),
@@ -95,6 +100,10 @@ public class IndexRepositoryFactory {
 
   }
 
+  private Map getBucketTargetingMap(PartitionedRegion region, int bucketId) {
+    return new BucketTargetingMap(region, bucketId);
+  }
+
   private String getLockName(final Integer bucketId, final BucketRegion 
fileBucket) {
     return FILE_REGION_LOCK_FOR_BUCKET_ID + fileBucket.getFullPath() + 
bucketId;
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 53b4e08..9a39b39 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -17,8 +17,10 @@ package org.apache.geode.cache.lucene.internal;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.FixedPartitionResolver;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionResolver;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionShortcut;
@@ -28,6 +30,8 @@ import 
org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
 import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
 import org.apache.geode.cache.lucene.internal.filesystem.File;
 import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
+import 
org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver;
+import 
org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
 import 
org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
 import org.apache.geode.cache.partition.PartitionListener;
@@ -143,9 +147,18 @@ public class LuceneIndexForPartitionedRegion extends 
LuceneIndexImpl {
       PartitionAttributes<?, ?> dataRegionAttributes) {
     
attributesFactory.setTotalNumBuckets(dataRegionAttributes.getTotalNumBuckets());
     
attributesFactory.setRedundantCopies(dataRegionAttributes.getRedundantCopies());
+    
attributesFactory.setPartitionResolver(getPartitionResolver(dataRegionAttributes));
     return attributesFactory;
   }
 
+  private PartitionResolver getPartitionResolver(PartitionAttributes 
dataRegionAttributes) {
+    if (dataRegionAttributes.getPartitionResolver() instanceof 
FixedPartitionResolver) {
+      return new BucketTargetingFixedResolver();
+    } else {
+      return new BucketTargetingResolver();
+    }
+  }
+
   protected <K, V> Region<K, V> createRegion(final String regionName,
       final RegionShortcut regionShortCut, final String 
colocatedWithRegionName,
       final PartitionAttributes partitionAttributes, final RegionAttributes 
regionAttributes,

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java
index 362cf93..18428ec 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/directory/RegionDirectory.java
@@ -15,14 +15,10 @@
 
 package org.apache.geode.cache.lucene.internal.directory;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
+import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
+import org.apache.geode.cache.lucene.internal.filesystem.File;
+import org.apache.geode.cache.lucene.internal.filesystem.FileSystem;
+import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
 import org.apache.lucene.store.BaseDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
@@ -30,10 +26,12 @@ import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.OutputStreamIndexOutput;
 import org.apache.lucene.store.SingleInstanceLockFactory;
 
-import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
-import org.apache.geode.cache.lucene.internal.filesystem.File;
-import org.apache.geode.cache.lucene.internal.filesystem.FileSystem;
-import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  * An implementation of Directory that stores data in geode regions.
@@ -49,8 +47,8 @@ public class RegionDirectory extends BaseDirectory {
    * Create a region directory with a given file and chunk region. These 
regions may be bucket
    * regions or they may be replicated regions.
    */
-  public RegionDirectory(ConcurrentMap<String, File> fileRegion,
-      ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) {
+  public RegionDirectory(Map<String, File> fileRegion, Map<ChunkKey, byte[]> 
chunkRegion,
+      FileSystemStats stats) {
     super(new SingleInstanceLockFactory());
     fs = new FileSystem(fileRegion, chunkRegion, stats);
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java
index f3975bf..660816d 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java
@@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -38,8 +39,8 @@ import java.util.concurrent.ConcurrentMap;
 public class FileSystem {
   private static final Logger logger = LogService.getLogger();
 
-  private final ConcurrentMap<String, File> fileRegion;
-  private final ConcurrentMap<ChunkKey, byte[]> chunkRegion;
+  private final Map<String, File> fileRegion;
+  private final Map<ChunkKey, byte[]> chunkRegion;
 
   static final int CHUNK_SIZE = 1024 * 1024; // 1 MB
   private final FileSystemStats stats;
@@ -54,8 +55,8 @@ public class FileSystem {
    * @param fileRegion the region to store metadata about the files
    * @param chunkRegion the region to store actual file data.
    */
-  public FileSystem(ConcurrentMap<String, File> fileRegion,
-      ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) {
+  public FileSystem(Map<String, File> fileRegion, Map<ChunkKey, byte[]> 
chunkRegion,
+      FileSystemStats stats) {
     this.fileRegion = fileRegion;
     this.chunkRegion = chunkRegion;
     this.stats = stats;
@@ -188,11 +189,11 @@ public class FileSystem {
     fileRegion.put(file.getName(), file);
   }
 
-  public ConcurrentMap<String, File> getFileRegion() {
+  public Map<String, File> getFileRegion() {
     return fileRegion;
   }
 
-  public ConcurrentMap<ChunkKey, byte[]> getChunkRegion() {
+  public Map<ChunkKey, byte[]> getChunkRegion() {
     return chunkRegion;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java
new file mode 100644
index 0000000..66035c4
--- /dev/null
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingFixedResolver.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal.partition;
+
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.FixedPartitionAttributes;
+import org.apache.geode.cache.FixedPartitionResolver;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.internal.cache.FixedPartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A partition resolver that expects the actual bucket id to be the callback 
argument of all
+ * operations. The partition resolver reverse engineers the fixed partition 
name and bucket number
+ * from the target partitioning.
+ *
+ * This is a bit messy, mostly because there's no good way to get the 
FixedPartition from the actual
+ * bucket id without iterating over all of the fixed partitions.
+ */
+public class BucketTargetingFixedResolver implements FixedPartitionResolver {
+
+  @Override
+  public Object getRoutingObject(final EntryOperation opDetails) {
+    int targetBucketId = (Integer) opDetails.getCallbackArgument();
+    final Map.Entry<String, Integer[]> targetPartition = 
getFixedPartition(opDetails);
+
+    return targetBucketId - targetPartition.getValue()[0];
+  }
+
+  @Override
+  public String getName() {
+    return getClass().getName();
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public String getPartitionName(final EntryOperation opDetails,
+      @Deprecated final Set targetPartitions) {
+    final Map.Entry<String, Integer[]> targetPartition = 
getFixedPartition(opDetails);
+    return targetPartition.getKey();
+  }
+
+  protected Map.Entry<String, Integer[]> getFixedPartition(final 
EntryOperation opDetails) {
+    PartitionedRegion region = (PartitionedRegion) opDetails.getRegion();
+    int targetBucketId = (Integer) opDetails.getCallbackArgument();
+    Map<String, Integer[]> partitions = region.getPartitionsMap();
+
+    return partitions.entrySet().stream().filter(entry -> 
withinPartition(entry, targetBucketId))
+        .findFirst().get();
+  }
+
+  private boolean withinPartition(Map.Entry<String, Integer[]> entry, int 
bucketId) {
+    int startingBucket = entry.getValue()[0];
+    int endingBucket = startingBucket + entry.getValue()[1];
+    return startingBucket <= bucketId && bucketId < endingBucket;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java
new file mode 100644
index 0000000..e2467ac
--- /dev/null
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMap.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal.partition;
+
+import org.apache.geode.cache.EntryExistsException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.internal.cache.PartitionedRegion;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Set;
+
+public class BucketTargetingMap<K, V> extends AbstractMap<K, V> {
+
+  private final Region<K, V> region;
+  public Object callbackArg;
+
+  public BucketTargetingMap(PartitionedRegion region, int bucketId) {
+    this.callbackArg = bucketId;
+    this.region = new LocalDataSet(region, Collections.singleton(bucketId));
+  }
+
+  @Override
+  public Set<K> keySet() {
+    return region.keySet();
+  }
+
+  @Override
+  public V putIfAbsent(final K key, final V value) {
+    try {
+      region.create(key, value, callbackArg);
+    } catch (EntryExistsException e) {
+      return (V) e.getOldValue();
+    }
+    return null;
+  }
+
+  @Override
+  public V get(final Object key) {
+    return region.get(key, callbackArg);
+  }
+
+  @Override
+  public V remove(final Object key) {
+    try {
+      V oldValue = region.get(key, callbackArg);
+      region.destroy(key, callbackArg);
+      return oldValue;
+    } catch (EntryNotFoundException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean containsKey(final Object key) {
+    return region.get(key, callbackArg) != null;
+  }
+
+  @Override
+  public V put(final K key, final V value) {
+    return region.put(key, value, callbackArg);
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet() {
+    return region.entrySet();
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+
+    final BucketTargetingMap<?, ?> that = (BucketTargetingMap<?, ?>) o;
+
+    if (!region.getFullPath().equals(that.region.getFullPath())) {
+      return false;
+    }
+    return callbackArg.equals(that.callbackArg);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    result = 31 * result + region.hashCode();
+    result = 31 * result + callbackArg.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "BucketTargetingMap{" + "region=" + region.getFullPath() + ", 
callbackArg=" + callbackArg
+        + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java
new file mode 100644
index 0000000..a6b5c10
--- /dev/null
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingResolver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal.partition;
+
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.PartitionResolver;
+
+/**
+ * A partition resolver that expects all operations to be performed with a 
callback argument that
+ * indicates the actual bucket to target.
+ */
+public class BucketTargetingResolver implements PartitionResolver {
+  @Override
+  public Object getRoutingObject(final EntryOperation opDetails) {
+    return opDetails.getCallbackArgument();
+  }
+
+  @Override
+  public String getName() {
+    return getClass().getName();
+  }
+
+  @Override
+  public void close() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
index 78edb5c..9318b0e 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
@@ -27,7 +27,7 @@ public abstract class LuceneDUnitTest extends 
JUnit4CacheTestCase {
   public void postSetUp() throws Exception {
     Host host = Host.getHost(0);
     dataStore1 = host.getVM(0);
-    dataStore2 = host.getVM(-1);
+    dataStore2 = host.getVM(1);
   }
 
   protected abstract void initDataStore(SerializableRunnableIF createIndex) 
throws Exception;

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
index be2a754..25a4927 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
@@ -70,45 +70,6 @@ public abstract class LuceneQueriesPRBase extends 
LuceneQueriesBase {
   }
 
   @Test
-  public void returnCorrectResultsWhenIndexUpdateHappensIntheMiddleofGII()
-      throws InterruptedException {
-    SerializableRunnableIF createIndex = () -> {
-      LuceneService luceneService = LuceneServiceProvider.get(getCache());
-      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
-    };
-    dataStore1.invoke(() -> initDataStore(createIndex));
-    accessor.invoke(() -> initAccessor(createIndex));
-    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
-    putEntryInEachBucket();
-
-    dataStore2.invoke(() -> {
-      InitialImageOperation.setGIITestHook(
-          new GIITestHook(GIITestHookType.AfterSentRequestImage, "Do puts 
during request") {
-            @Override
-            public void reset() {
-
-          }
-
-            @Override
-            public String getRegionName() {
-              return "_B__index#__region.files_0";
-            }
-
-            @Override
-            public void run() {
-              dataStore1.invoke(() -> 
LuceneTestUtilities.resumeSender(getCache()));
-              waitForFlushBeforeExecuteTextSearch(dataStore1, 30000);
-            }
-          });
-    });
-
-    dataStore2.invoke(() -> initDataStore(createIndex));
-
-    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 30000));
-    executeTextSearch(accessor, "world", "text", NUM_BUCKETS);
-  }
-
-  @Test
   public void returnCorrectResultsWhenMoveBucketHappensOnIndexUpdate() throws 
InterruptedException {
     final DistributedMember member2 =
         dataStore2.invoke(() -> 
getCache().getDistributedSystem().getDistributedMember());
@@ -232,7 +193,10 @@ public abstract class LuceneQueriesPRBase extends 
LuceneQueriesBase {
   }
 
   private void removeCallback(VM vm) {
-    vm.invoke(IndexRepositorySpy::remove);
+    vm.invoke(() -> {
+      IndexRepositorySpy.remove();
+      InitialImageOperation.resetAllGIITestHooks();
+    });
   }
 
   private void rebalanceRegion(VM vm) {

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java
index dd9fa6b..2622063 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerFixedPRDUnitTest.java
@@ -22,7 +22,7 @@ import org.apache.geode.test.junit.categories.DistributedTest;
 import org.junit.experimental.categories.Category;
 
 @Category(DistributedTest.class)
-public class LuceneQueriesPeerFixedPRDUnitTest extends LuceneQueriesPRBase {
+public class LuceneQueriesPeerFixedPRDUnitTest extends 
LuceneQueriesPeerPRRedundancyDUnitTest {
 
   @Override
   protected void initAccessor(SerializableRunnableIF createIndex) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
index 496721a..1b4303a 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
@@ -28,6 +28,9 @@ import org.apache.geode.cache.lucene.test.IndexRepositorySpy;
 import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
 import org.apache.geode.distributed.DistributedMember;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InitialImageOperation.GIITestHook;
+import org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
 import 
org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
@@ -101,6 +104,45 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest 
extends LuceneQueriesPRBase
         () -> Awaitility.await().atMost(60, 
TimeUnit.SECONDS).until(basicGetCache()::isClosed));
   }
 
+  @Test
+  public void returnCorrectResultsWhenIndexUpdateHappensIntheMiddleofGII()
+      throws InterruptedException {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex));
+    accessor.invoke(() -> initAccessor(createIndex));
+    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
+    putEntryInEachBucket();
+
+    dataStore2.invoke(() -> {
+      InitialImageOperation.setGIITestHook(
+          new GIITestHook(GIITestHookType.AfterSentRequestImage, "Do puts 
during request") {
+            @Override
+            public void reset() {
+
+          }
+
+            @Override
+            public String getRegionName() {
+              return "_B__index#__region.files_0";
+            }
+
+            @Override
+            public void run() {
+              dataStore1.invoke(() -> 
LuceneTestUtilities.resumeSender(getCache()));
+              waitForFlushBeforeExecuteTextSearch(dataStore1, 30000);
+            }
+          });
+    });
+
+    dataStore2.invoke(() -> initDataStore(createIndex));
+
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 30000));
+    executeTextSearch(accessor, "world", "text", NUM_BUCKETS);
+  }
+
   private void putEntriesAndValidateResultsWithRedundancy() {
     SerializableRunnableIF createIndex = () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
index 7890903..bb8e0b4 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.locks.DLockService;
 import org.apache.geode.internal.cache.BucketAdvisor;
@@ -102,8 +103,11 @@ public class PartitionedRepositoryManagerJUnitTest {
     fileRegion = Mockito.mock(PartitionedRegion.class);
     fileDataStore = Mockito.mock(PartitionedRegionDataStore.class);
     when(fileRegion.getDataStore()).thenReturn(fileDataStore);
+    when(fileRegion.getTotalNumberOfBuckets()).thenReturn(113);
+    when(fileRegion.getFullPath()).thenReturn("FileRegion");
     chunkRegion = Mockito.mock(PartitionedRegion.class);
     chunkDataStore = Mockito.mock(PartitionedRegionDataStore.class);
+    when(chunkRegion.getFullPath()).thenReturn("ChunkRegion");
     when(chunkRegion.getDataStore()).thenReturn(chunkDataStore);
     indexStats = Mockito.mock(LuceneIndexStats.class);
     fileSystemStats = Mockito.mock(FileSystemStats.class);
@@ -237,8 +241,10 @@ public class PartitionedRepositoryManagerJUnitTest {
   protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
     IndexWriter writer0 = repo0.getWriter();
     RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory();
-    assertEquals(fileBuckets.get(bucketId), 
dir0.getFileSystem().getFileRegion());
-    assertEquals(chunkBuckets.get(bucketId), 
dir0.getFileSystem().getChunkRegion());
+    assertEquals(new BucketTargetingMap(fileRegion, bucketId),
+        dir0.getFileSystem().getFileRegion());
+    assertEquals(new BucketTargetingMap(chunkRegion, bucketId),
+        dir0.getFileSystem().getChunkRegion());
     assertEquals(serializer, repo0.getSerializer());
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/372ddeea/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java
new file mode 100644
index 0000000..468fd04
--- /dev/null
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/partition/BucketTargetingMapTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal.partition;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+
+@Category(UnitTest.class)
+public class BucketTargetingMapTest {
+
+  private PartitionedRegion region;
+
+  @Before
+  public void initMocks() {
+    region = mock(PartitionedRegion.class);
+    final BucketTargetingResolver resolver = new BucketTargetingResolver();
+    when(region.getPartitionResolver()).thenReturn(resolver);
+    when(region.getTotalNumberOfBuckets()).thenReturn(5);
+  }
+
+  @Test
+  public void getUsesCallbackArg() {
+    final BucketTargetingMap map = new BucketTargetingMap(region, 1);
+    when(region.get(eq("key"), eq(1))).thenReturn("value");
+    assertEquals("value", map.get("key"));
+  }
+
+  @Test
+  public void putIfAbsentUsesCallbackArg() {
+    final BucketTargetingMap map = new BucketTargetingMap(region, 1);
+    map.putIfAbsent("key", "value");
+    verify(region).create(eq("key"), eq("value"), eq(1));
+  }
+
+  @Test
+  public void containsKeyUsesCallbackArg() {
+    final BucketTargetingMap map = new BucketTargetingMap(region, 1);
+    when(region.get(eq("key"), eq(1))).thenReturn("value");
+    assertEquals(true, map.containsKey("key"));
+    assertEquals(false, map.containsKey("none"));
+    verify(region).get(eq("none"), eq(1));
+  }
+}

Reply via email to