This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 3f4474c GEODE-4712 GEODE-5943: shut down the bucketSorter when destroying the partitioned region (#2845) 3f4474c is described below commit 3f4474cfe2a6b7bb6b20ad31e8be1e2ebff55f45 Author: jinmeiliao <jil...@pivotal.io> AuthorDate: Thu Nov 15 12:48:51 2018 -0800 GEODE-4712 GEODE-5943: shut down the bucketSorter when destroying the partitioned region (#2845) --- .../internal/cache/eviction/EvictionDUnitTest.java | 8 ++-- .../cache/PartitionedRegionIntegrationTest.java | 43 ++++++++++++++++++++++ .../geode/internal/cache/PartitionedRegion.java | 15 ++++++-- .../apache/geode/test/junit/rules/VMProvider.java | 9 +++++ 4 files changed, 67 insertions(+), 8 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/eviction/EvictionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/eviction/EvictionDUnitTest.java index ec4e022..eb929a1 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/eviction/EvictionDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/eviction/EvictionDUnitTest.java @@ -89,7 +89,7 @@ public class EvictionDUnitTest { server1 = cluster.startServerVM(1, s -> s.withNoCacheServer() .withProperties(properties).withConnectionToLocator(locatorPort)); - VMProvider.invokeInEveryMember(() -> { + VMProvider.invokeInEveryMember("setup VM", () -> { HeapMemoryMonitor.setTestDisableMemoryUpdates(true); System.setProperty("gemfire.memoryEventTolerance", "0"); InternalCache cache = ClusterStartupRule.getCache(); @@ -104,7 +104,7 @@ public class EvictionDUnitTest { @Test public void testDummyInlineNCentralizedEviction() { - VMProvider.invokeInEveryMember(() -> { + VMProvider.invokeInEveryMember("create region", () -> { ServerStarterRule server = (ServerStarterRule) ClusterStartupRule.memberStarter; server.createPartitionRegion("PR1", f -> f.setOffHeap(offHeap).setEvictionAttributes( @@ -113,7 +113,7 @@ public class EvictionDUnitTest { }, server0, server1); - server0.invoke(() -> { + server0.invoke("put data", () -> { Region region = ClusterStartupRule.getCache().getRegion("PR1"); for (int counter = 1; counter <= 50; counter++) { region.put(counter, new byte[ENTRY_SIZE]); @@ -124,7 +124,7 @@ public class EvictionDUnitTest { int server1ExpectedEviction = server1.invoke(() -> sendEventAndWaitForExpectedEviction("PR1")); // do 4 puts again in PR1 - server0.invoke(() -> { + server0.invoke("put more data", () -> { Region region = ClusterStartupRule.getCache().getRegion("PR1"); for (int counter = 1; counter <= 4; counter++) { region.put(counter, new byte[ENTRY_SIZE]); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java new file mode 100644 index 0000000..4b20abc --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java @@ -0,0 +1,43 @@ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.ScheduledExecutorService; + +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.EvictionAction; +import org.apache.geode.cache.EvictionAttributes; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.test.junit.rules.ServerStarterRule; + +public class PartitionedRegionIntegrationTest { + + @Rule + public ServerStarterRule server = new ServerStarterRule().withNoCacheServer().withAutoStart(); + + @Test + public void bucketSorterShutdownAfterRegionDestroy() { + PartitionedRegion region = + (PartitionedRegion) server.createRegion(RegionShortcut.PARTITION, "PR1", + f -> f.setEvictionAttributes( + EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.LOCAL_DESTROY))); + + ScheduledExecutorService bucketSorter = region.getBucketSorter(); + assertThat(bucketSorter).isNotNull(); + + region.destroyRegion(); + + assertThat(bucketSorter.isShutdown()).isTrue(); + } + + @Test + public void bucketSorterIsNotCreatedIfNoEviction() { + PartitionedRegion region = + (PartitionedRegion) server.createRegion(RegionShortcut.PARTITION, "PR1", + rf -> rf.setOffHeap(false)); + ScheduledExecutorService bucketSorter = region.getBucketSorter(); + assertThat(bucketSorter).isNull(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index f3e86d2..72c3659 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -467,6 +467,9 @@ public class PartitionedRegion extends LocalRegion colocationListeners.remove(colocationListener); } + ScheduledExecutorService getBucketSorter() { + return bucketSorter; + } static PRIdMap getPrIdToPR() { return prIdToPR; @@ -7628,6 +7631,10 @@ public class PartitionedRegion extends LocalRegion colocatedWithRegion.getColocatedByList().remove(this); } + if (bucketSorter != null) { + bucketSorter.shutdown(); + } + RegionLogger.logDestroy(getName(), this.cache.getInternalDistributedSystem().getDistributedMember(), null, op.isClose()); } @@ -9243,11 +9250,11 @@ public class PartitionedRegion extends LocalRegion public List<BucketRegion> getSortedBuckets() { if (!bucketSorterStarted.get()) { bucketSorterStarted.set(true); - this.bucketSorter.scheduleAtFixedRate(new BucketSorterThread(), 0, + this.bucketSorter.scheduleAtFixedRate(new BucketSorterRunnable(), 0, HeapEvictor.BUCKET_SORTING_INTERVAL, TimeUnit.MILLISECONDS); if (logger.isDebugEnabled()) { logger.debug( - "Started BucketSorter to sort the buckets according to numver of entries in each bucket for every {} milliseconds", + "Started BucketSorter to sort the buckets according to number of entries in each bucket for every {} milliseconds", HeapEvictor.BUCKET_SORTING_INTERVAL); } } @@ -9259,7 +9266,7 @@ public class PartitionedRegion extends LocalRegion return bucketList; } - class BucketSorterThread implements Runnable { + class BucketSorterRunnable implements Runnable { @Override public void run() { try { @@ -9290,7 +9297,7 @@ public class PartitionedRegion extends LocalRegion } } catch (Exception e) { if (logger.isDebugEnabled()) { - logger.debug("BucketSorterThread : encountered Exception ", e); + logger.debug("BucketSorterRunnable : encountered Exception ", e); } } } diff --git a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/VMProvider.java b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/VMProvider.java index b68699d..de52d9e 100644 --- a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/VMProvider.java +++ b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/VMProvider.java @@ -37,6 +37,15 @@ public abstract class VMProvider { Arrays.stream(members).forEach(member -> member.invoke(runnableIF)); } + public static void invokeInEveryMember(String name, SerializableRunnableIF runnableIF, + VMProvider... members) { + if (ArrayUtils.isEmpty(members)) { + throw new IllegalArgumentException("Array of members must not be null nor empty."); + } + + Arrays.stream(members).forEach(member -> member.invoke(name, runnableIF)); + } + public abstract VM getVM(); public void stop() {