This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 3c9ddc5 GEODE-6412: Improve concurrency for getBucketIndex (#3198)
3c9ddc5 is described below
commit 3c9ddc56dcead7fcf868a6ed64c115293b4bf931
Author: Jason Huynh <[email protected]>
AuthorDate: Tue Mar 12 17:22:27 2019 +0000
GEODE-6412: Improve concurrency for getBucketIndex (#3198)
* Caching an arbitrary index instead of finding one for every
getBucketIndex call
* Refactored to use jmh asych group threads
---
.../PartitionedIndexGetBucketIndexBenchmark.java | 88 ++++++++++++++++++++++
.../internal/util/ComputeIfAbsentBenchmark.java | 55 +++-----------
.../query/internal/index/PartitionedIndex.java | 28 +++++--
3 files changed, 120 insertions(+), 51 deletions(-)
diff --git
a/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/PartitionedIndexGetBucketIndexBenchmark.java
b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/PartitionedIndexGetBucketIndexBenchmark.java
new file mode 100644
index 0000000..c2be0e5
--- /dev/null
+++
b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/PartitionedIndexGetBucketIndexBenchmark.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.query.Index;
+import org.apache.geode.cache.query.internal.index.PartitionedIndex;
+import org.apache.geode.distributed.DistributedSystem;
+
+/**
+ * Test spins up threads that constantly do getBucketIndex
+ * The tests will measure throughput
+ * The benchmark tests getBucketIndex in the presence of other threads
attempting the same operation
+ */
+
+@State(Scope.Thread)
+@Fork(1)
+public class PartitionedIndexGetBucketIndexBenchmark {
+
+ private PartitionedIndex index;
+ /*
+ * After load is established, how many measurements shall we take?
+ */
+ private static final double BENCHMARK_ITERATIONS = 10;
+
+
+ @Setup(Level.Trial)
+ public void trialSetup() throws InterruptedException {
+ DistributedSystem mockDS = mock(DistributedSystem.class);
+ Cache mockCache = mock(Cache.class);
+ Region mockRegion = mock(Region.class);
+ when(mockRegion.getCache()).thenReturn(mockCache);
+ when(mockCache.getDistributedSystem()).thenReturn(mockDS);
+ index = new PartitionedIndex(null, null, "", mockRegion, "", "", "");
+ index.addToBucketIndexes(mockRegion, mock(Index.class));
+ }
+
+ @Group("getBucketIndexThroughput")
+ @GroupThreads(10)
+ @Benchmark
+ public void getBucketIndexLoad() {
+ index.getBucketIndex();
+ }
+
+ @Benchmark
+ @Group("getBucketIndexThroughput")
+ @GroupThreads(1)
+ @Measurement(iterations = (int) BENCHMARK_ITERATIONS)
+ @BenchmarkMode(Mode.Throughput)
+ @OutputTimeUnit(TimeUnit.SECONDS)
+ // @Warmup we don't warm up because our @Setup warms us up
+ public Object getBucketIndex() {
+ return index.getBucketIndex();
+ }
+
+}
diff --git
a/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
b/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
index 88af9d6..3c82c71 100644
---
a/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
+++
b/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
@@ -17,14 +17,13 @@ package org.apache.geode.internal.util;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
@@ -32,7 +31,7 @@ import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
+
/**
* Test spins up threads that constantly do computeIfAbsent
@@ -50,42 +49,20 @@ public class ComputeIfAbsentBenchmark {
*/
private static final double BENCHMARK_ITERATIONS = 10;
- private static final int TIME_TO_QUIESCE_BEFORE_SAMPLING = 1;
-
- private static final int THREAD_POOL_PROCESSOR_MULTIPLE = 2;
-
- private ScheduledThreadPoolExecutor loadGenerationExecutorService;
-
- private static boolean testRunning = true;
-
@Setup(Level.Trial)
- public void trialSetup() throws InterruptedException {
-
- final int numberOfThreads =
- THREAD_POOL_PROCESSOR_MULTIPLE *
Runtime.getRuntime().availableProcessors();
-
- loadGenerationExecutorService =
- (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
- numberOfThreads);
+ public void trialSetup() throws InterruptedException {}
- System.out.println(String.format("Pool has %d threads", numberOfThreads));
- loadGenerationExecutorService.setRemoveOnCancelPolicy(true);
-
- generateLoad(
- loadGenerationExecutorService, numberOfThreads);
-
- // allow system to quiesce
- Thread.sleep(TIME_TO_QUIESCE_BEFORE_SAMPLING);
- }
-
- @TearDown(Level.Trial)
- public void trialTeardown() {
- testRunning = false;
- loadGenerationExecutorService.shutdownNow();
+ @Group("getBucketIndexThroughput")
+ @GroupThreads(10)
+ @Benchmark
+ public void getBucketIndexLoad() {
+ JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
}
@Benchmark
+ @Group("computeIfAbsentThroughput")
+ @GroupThreads(1)
@Measurement(iterations = (int) BENCHMARK_ITERATIONS)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@@ -94,14 +71,4 @@ public class ComputeIfAbsentBenchmark {
return JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
}
- private void generateLoad(final ScheduledExecutorService executorService,
int numThreads) {
- for (int i = 0; i < numThreads; i++) {
- executorService.schedule(() -> {
- while (testRunning) {
- JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
- }
- }, 0, TimeUnit.MILLISECONDS);
- }
- }
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
index 9dfceee..916e323 100755
---
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
@@ -58,6 +58,10 @@ public class PartitionedIndex extends AbstractIndex {
private Map<Region, List<Index>> bucketIndexes =
Collections.synchronizedMap(new HashMap<Region, List<Index>>());
+ // An arbitrary bucket index from this PartiionedIndex that is used as a
representative
+ // index for the entire PartitionIndex. Usually used for scoring/sizing of
an index when
+ // selecting which index to use
+ private volatile Index arbitraryBucketIndex;
/**
* Type on index represented by this partitioned index.
*
@@ -108,6 +112,7 @@ public class PartitionedIndex extends AbstractIndex {
*/
public void addToBucketIndexes(Region r, Index index) {
synchronized (this.bucketIndexes) {
+ setArbitraryBucketIndex(index);
List<Index> indexes = this.bucketIndexes.get(r);
if (indexes == null) {
indexes = new ArrayList<Index>();
@@ -126,6 +131,9 @@ public class PartitionedIndex extends AbstractIndex {
this.bucketIndexes.remove(r);
}
}
+ if (index == arbitraryBucketIndex) {
+ setArbitraryBucketIndex(retrieveArbitraryBucketIndex());
+ }
}
}
@@ -170,10 +178,13 @@ public class PartitionedIndex extends AbstractIndex {
}
}
- /**
- * Returns one of the bucket index. To get all bucket index use
getBucketIndexes()
- */
- public Index getBucketIndex() {
+ public void setArbitraryBucketIndex(Index index) {
+ if (arbitraryBucketIndex == null) {
+ arbitraryBucketIndex = index;
+ }
+ }
+
+ public Index retrieveArbitraryBucketIndex() {
Index index = null;
synchronized (this.bucketIndexes) {
if (this.bucketIndexes.size() > 0) {
@@ -186,6 +197,10 @@ public class PartitionedIndex extends AbstractIndex {
return index;
}
+ public Index getBucketIndex() {
+ return arbitraryBucketIndex;
+ }
+
protected Map.Entry<Region, List<Index>> getFirstBucketIndex() {
Map.Entry<Region, List<Index>> firstIndexEntry = null;
synchronized (this.bucketIndexes) {
@@ -275,7 +290,6 @@ public class PartitionedIndex extends AbstractIndex {
}
-
@Override
protected boolean isCompactRangeIndex() {
return false;
@@ -423,7 +437,6 @@ public class PartitionedIndex extends AbstractIndex {
/**
* Internal class for partitioned index statistics. Statistics are not
supported right now.
- *
*/
class PartitionedIndexStatistics extends InternalIndexStatistics {
private IndexStats vsdStats;
@@ -590,7 +603,8 @@ public class PartitionedIndex extends AbstractIndex {
*/
@Override
void lockedQuery(Object lowerBoundKey, int lowerBoundOperator, Object
upperBoundKey,
- int upperBoundOperator, Collection results, Set keysToRemove,
ExecutionContext context)
+ int upperBoundOperator, Collection results, Set keysToRemove,
+ ExecutionContext context)
throws TypeMismatchException {
throw new RuntimeException(
"Not supported on partitioned index");