This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3c9ddc5  GEODE-6412: Improve concurrency for getBucketIndex (#3198)
3c9ddc5 is described below

commit 3c9ddc56dcead7fcf868a6ed64c115293b4bf931
Author: Jason Huynh <[email protected]>
AuthorDate: Tue Mar 12 17:22:27 2019 +0000

    GEODE-6412: Improve concurrency for getBucketIndex (#3198)
    
      * Caching an arbitrary index instead of finding one for every 
getBucketIndex call
      * Refactored to use jmh asych group threads
---
 .../PartitionedIndexGetBucketIndexBenchmark.java   | 88 ++++++++++++++++++++++
 .../internal/util/ComputeIfAbsentBenchmark.java    | 55 +++-----------
 .../query/internal/index/PartitionedIndex.java     | 28 +++++--
 3 files changed, 120 insertions(+), 51 deletions(-)

diff --git 
a/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/PartitionedIndexGetBucketIndexBenchmark.java
 
b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/PartitionedIndexGetBucketIndexBenchmark.java
new file mode 100644
index 0000000..c2be0e5
--- /dev/null
+++ 
b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/PartitionedIndexGetBucketIndexBenchmark.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.query.Index;
+import org.apache.geode.cache.query.internal.index.PartitionedIndex;
+import org.apache.geode.distributed.DistributedSystem;
+
+/**
+ * Test spins up threads that constantly do getBucketIndex
+ * The tests will measure throughput
+ * The benchmark tests getBucketIndex in the presence of other threads 
attempting the same operation
+ */
+
+@State(Scope.Thread)
+@Fork(1)
+public class PartitionedIndexGetBucketIndexBenchmark {
+
+  private PartitionedIndex index;
+  /*
+   * After load is established, how many measurements shall we take?
+   */
+  private static final double BENCHMARK_ITERATIONS = 10;
+
+
+  @Setup(Level.Trial)
+  public void trialSetup() throws InterruptedException {
+    DistributedSystem mockDS = mock(DistributedSystem.class);
+    Cache mockCache = mock(Cache.class);
+    Region mockRegion = mock(Region.class);
+    when(mockRegion.getCache()).thenReturn(mockCache);
+    when(mockCache.getDistributedSystem()).thenReturn(mockDS);
+    index = new PartitionedIndex(null, null, "", mockRegion, "", "", "");
+    index.addToBucketIndexes(mockRegion, mock(Index.class));
+  }
+
+  @Group("getBucketIndexThroughput")
+  @GroupThreads(10)
+  @Benchmark
+  public void getBucketIndexLoad() {
+    index.getBucketIndex();
+  }
+
+  @Benchmark
+  @Group("getBucketIndexThroughput")
+  @GroupThreads(1)
+  @Measurement(iterations = (int) BENCHMARK_ITERATIONS)
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.SECONDS)
+  // @Warmup we don't warm up because our @Setup warms us up
+  public Object getBucketIndex() {
+    return index.getBucketIndex();
+  }
+
+}
diff --git 
a/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
 
b/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
index 88af9d6..3c82c71 100644
--- 
a/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
+++ 
b/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
@@ -17,14 +17,13 @@ package org.apache.geode.internal.util;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
@@ -32,7 +31,7 @@ import org.openjdk.jmh.annotations.OutputTimeUnit;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
+
 
 /**
  * Test spins up threads that constantly do computeIfAbsent
@@ -50,42 +49,20 @@ public class ComputeIfAbsentBenchmark {
    */
   private static final double BENCHMARK_ITERATIONS = 10;
 
-  private static final int TIME_TO_QUIESCE_BEFORE_SAMPLING = 1;
-
-  private static final int THREAD_POOL_PROCESSOR_MULTIPLE = 2;
-
-  private ScheduledThreadPoolExecutor loadGenerationExecutorService;
-
-  private static boolean testRunning = true;
-
   @Setup(Level.Trial)
-  public void trialSetup() throws InterruptedException {
-
-    final int numberOfThreads =
-        THREAD_POOL_PROCESSOR_MULTIPLE * 
Runtime.getRuntime().availableProcessors();
-
-    loadGenerationExecutorService =
-        (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
-            numberOfThreads);
+  public void trialSetup() throws InterruptedException {}
 
-    System.out.println(String.format("Pool has %d threads", numberOfThreads));
 
-    loadGenerationExecutorService.setRemoveOnCancelPolicy(true);
-
-    generateLoad(
-        loadGenerationExecutorService, numberOfThreads);
-
-    // allow system to quiesce
-    Thread.sleep(TIME_TO_QUIESCE_BEFORE_SAMPLING);
-  }
-
-  @TearDown(Level.Trial)
-  public void trialTeardown() {
-    testRunning = false;
-    loadGenerationExecutorService.shutdownNow();
+  @Group("getBucketIndexThroughput")
+  @GroupThreads(10)
+  @Benchmark
+  public void getBucketIndexLoad() {
+    JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
   }
 
   @Benchmark
+  @Group("computeIfAbsentThroughput")
+  @GroupThreads(1)
   @Measurement(iterations = (int) BENCHMARK_ITERATIONS)
   @BenchmarkMode(Mode.Throughput)
   @OutputTimeUnit(TimeUnit.SECONDS)
@@ -94,14 +71,4 @@ public class ComputeIfAbsentBenchmark {
     return JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
   }
 
-  private void generateLoad(final ScheduledExecutorService executorService, 
int numThreads) {
-    for (int i = 0; i < numThreads; i++) {
-      executorService.schedule(() -> {
-        while (testRunning) {
-          JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
-        }
-      }, 0, TimeUnit.MILLISECONDS);
-    }
-  }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
index 9dfceee..916e323 100755
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
@@ -58,6 +58,10 @@ public class PartitionedIndex extends AbstractIndex {
   private Map<Region, List<Index>> bucketIndexes =
       Collections.synchronizedMap(new HashMap<Region, List<Index>>());
 
+  // An arbitrary bucket index from this PartiionedIndex that is used as a 
representative
+  // index for the entire PartitionIndex. Usually used for scoring/sizing of 
an index when
+  // selecting which index to use
+  private volatile Index arbitraryBucketIndex;
   /**
    * Type on index represented by this partitioned index.
    *
@@ -108,6 +112,7 @@ public class PartitionedIndex extends AbstractIndex {
    */
   public void addToBucketIndexes(Region r, Index index) {
     synchronized (this.bucketIndexes) {
+      setArbitraryBucketIndex(index);
       List<Index> indexes = this.bucketIndexes.get(r);
       if (indexes == null) {
         indexes = new ArrayList<Index>();
@@ -126,6 +131,9 @@ public class PartitionedIndex extends AbstractIndex {
           this.bucketIndexes.remove(r);
         }
       }
+      if (index == arbitraryBucketIndex) {
+        setArbitraryBucketIndex(retrieveArbitraryBucketIndex());
+      }
     }
   }
 
@@ -170,10 +178,13 @@ public class PartitionedIndex extends AbstractIndex {
     }
   }
 
-  /**
-   * Returns one of the bucket index. To get all bucket index use 
getBucketIndexes()
-   */
-  public Index getBucketIndex() {
+  public void setArbitraryBucketIndex(Index index) {
+    if (arbitraryBucketIndex == null) {
+      arbitraryBucketIndex = index;
+    }
+  }
+
+  public Index retrieveArbitraryBucketIndex() {
     Index index = null;
     synchronized (this.bucketIndexes) {
       if (this.bucketIndexes.size() > 0) {
@@ -186,6 +197,10 @@ public class PartitionedIndex extends AbstractIndex {
     return index;
   }
 
+  public Index getBucketIndex() {
+    return arbitraryBucketIndex;
+  }
+
   protected Map.Entry<Region, List<Index>> getFirstBucketIndex() {
     Map.Entry<Region, List<Index>> firstIndexEntry = null;
     synchronized (this.bucketIndexes) {
@@ -275,7 +290,6 @@ public class PartitionedIndex extends AbstractIndex {
   }
 
 
-
   @Override
   protected boolean isCompactRangeIndex() {
     return false;
@@ -423,7 +437,6 @@ public class PartitionedIndex extends AbstractIndex {
 
   /**
    * Internal class for partitioned index statistics. Statistics are not 
supported right now.
-   *
    */
   class PartitionedIndexStatistics extends InternalIndexStatistics {
     private IndexStats vsdStats;
@@ -590,7 +603,8 @@ public class PartitionedIndex extends AbstractIndex {
    */
   @Override
   void lockedQuery(Object lowerBoundKey, int lowerBoundOperator, Object 
upperBoundKey,
-      int upperBoundOperator, Collection results, Set keysToRemove, 
ExecutionContext context)
+      int upperBoundOperator, Collection results, Set keysToRemove,
+      ExecutionContext context)
       throws TypeMismatchException {
     throw new RuntimeException(
         "Not supported on partitioned index");

Reply via email to