This is an automated email from the ASF dual-hosted git repository. mhanson pushed a commit to branch rebalance_bug in repository https://gitbox.apache.org/repos/asf/geode.git
commit 120b6620d0cb1caaf9204be279d88fdc95a92a29 Author: Mark Hanson <[email protected]> AuthorDate: Tue Aug 24 01:24:07 2021 -0700 first cut. --- .../control/RebalanceOperationDistributedTest.java | 113 +++++++++++++++++++++ .../internal/cache/PartitionedRegionDataStore.java | 33 ++++++ .../control/PartitionRebalanceDetailsImpl.java | 21 ++++ .../partitioned/PartitionedRegionRebalanceOp.java | 4 + 4 files changed, 171 insertions(+) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java index 94bd61c..3a1bb5e 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java @@ -54,6 +54,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import junitparams.JUnitParamsRunner; import junitparams.Parameters; @@ -76,6 +77,7 @@ import org.apache.geode.cache.LoaderHelper; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.asyncqueue.AsyncEvent; import org.apache.geode.cache.asyncqueue.AsyncEventListener; import org.apache.geode.cache.control.RebalanceOperation; @@ -108,6 +110,7 @@ import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.WaitCriterion; import org.apache.geode.test.dunit.cache.CacheTestCase; +import org.apache.geode.test.dunit.rules.DistributedDiskDirRule; import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; import org.apache.geode.test.junit.rules.ExecutorServiceRule; import org.apache.geode.util.internal.GeodeGlossary; @@ -131,6 +134,9 @@ public class RebalanceOperationDistributedTest extends CacheTestCase { @Rule public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); + @Rule + public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule(); + @After public void tearDown() { invokeInEveryVM(() -> { @@ -368,6 +374,89 @@ public class RebalanceOperationDistributedTest extends CacheTestCase { } } + /** + * Test that we correctly use the redundancy-zone property to determine where to place redundant + * copies of a buckets. + */ + @Test + public void testEnforceZoneWithSixServersAndTwoZones() throws InterruptedException { + VM vm0 = getVM(0); + VM vm1 = getVM(1); + VM vm2 = getVM(2); + VM vm3 = getVM(3); + VM vm4 = getVM(4); + VM vm5 = getVM(5); + Stream.of(vm0, vm1, vm2).forEach(vm -> vm.invoke(() -> setRedundancyZone("A"))); + Stream.of(vm3, vm4, vm5).forEach(vm -> vm.invoke(() -> setRedundancyZone("B"))); + + Stream.of(vm0, vm1, vm2, vm3, vm4, vm5).forEach(vm -> { + vm.invoke(() -> createPartitionedRegion("region1", 1, 113)); + vm.invoke(() -> createPartitionedRegion("region1Ancillary", 1, 113)); + + }); + + // Create some buckets + vm0.invoke(() -> { + Map<Integer, String> mapOfData = new HashMap<>(); + for (int i = 0; i < 100000; i++) { + mapOfData.put(i, "A"); + } + Region<Integer, String> region = getCache().getRegion("region1"); + region.putAll(mapOfData); + Region<Integer, String> region2 = getCache().getRegion("region1Ancillary"); + region2.putAll(mapOfData); + }); + + // make sure we can tell that the buckets have low redundancy + // vm0.invoke(() -> validateRedundancy("region1", 113, 0, 113)); + + // Make sure we still have low redundancy + vm0.invoke(() -> validateRedundancy("region1", 113, 1, 0)); + + vm0.invoke(() -> { + InternalResourceManager manager = getCache().getInternalResourceManager(); + RebalanceResults results = doRebalance(false, manager); + logger.info("Rebalance 1 Results = " + results); + validateStatistics(manager, results); + }); + + vm2.bounceForcibly(); + + vm0.invoke(() -> { + InternalResourceManager manager = getCache().getInternalResourceManager(); + RebalanceResults results = doRebalance(false, manager); + logger.info("Rebalance 2 Results = " + results); + }); + // Thread.sleep(2000); + + vm2.invoke(() -> { + setRedundancyZone("A"); + createPartitionedRegion("region1", 1, 113); + }); + + vm0.invoke(() -> { + InternalResourceManager manager = getCache().getInternalResourceManager(); + RebalanceResults results = doRebalance(false, manager); + logger.info("Rebalance 3 Results = " + results); + }); + + int zoneA = vm0.invoke(() -> getBucketCount("region1")); + zoneA += vm1.invoke(() -> getBucketCount("region1")); + zoneA += vm2.invoke(() -> getBucketCount("region1")); + + int zoneB = vm3.invoke(() -> getBucketCount("region1")); + zoneB += vm4.invoke(() -> getBucketCount("region1")); + zoneB += vm5.invoke(() -> getBucketCount("region1")); + + assertThat(zoneA).isEqualTo(zoneB).isEqualTo(113); + vm0.invoke(() -> validateBucketCountLessThan("region1", 38)); + vm1.invoke(() -> validateBucketCountLessThan("region1", 38)); + vm2.invoke(() -> validateBucketCountLessThan("region1", 38)); + vm3.invoke(() -> validateBucketCountLessThan("region1", 38)); + vm4.invoke(() -> validateBucketCountLessThan("region1", 38)); + vm5.invoke(() -> validateBucketCountLessThan("region1", 38)); + } + @Test public void testEnforceZoneWithMultipleRegions() { VM vm0 = getVM(0); @@ -2349,6 +2438,19 @@ public class RebalanceOperationDistributedTest extends CacheTestCase { regionFactory.create(regionName); } + private void createPartitionedRegion(String regionName, int redundantCopies, final int numBuckets, + final RegionShortcut regionShortcut) { + PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory(); + partitionAttributesFactory.setRedundantCopies(redundantCopies); + partitionAttributesFactory.setRecoveryDelay(-1); + partitionAttributesFactory.setStartupRecoveryDelay(-1); + partitionAttributesFactory.setTotalNumBuckets(numBuckets); + RegionFactory regionFactory = getCache().createRegionFactory(regionShortcut); + regionFactory.setPartitionAttributes(partitionAttributesFactory.create()); + + regionFactory.create(regionName); + } + private void createPartitionedRegion(String regionName, int redundantCopies) { PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory(); partitionAttributesFactory.setRedundantCopies(redundantCopies); @@ -2648,12 +2750,23 @@ public class RebalanceOperationDistributedTest extends CacheTestCase { assertThat(details.getLowRedundancyBucketCount()).isEqualTo(expectedLowRedundancyBucketCount); } + private int getBucketCount(String regionName) { + PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName); + return region.getLocalBucketsListTestOnly().size(); + } + private void validateBucketCount(String regionName, int numLocalBuckets) { PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName); assertThat(region.getLocalBucketsListTestOnly()).hasSize(numLocalBuckets); } + private void validateBucketCountLessThan(String regionName, int numLocalBuckets) { + PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName); + + assertThat(region.getLocalBucketsListTestOnly().size()).isLessThanOrEqualTo(numLocalBuckets); + } + private void validateStatistics(InternalResourceManager manager, RebalanceResults results) { ResourceManagerStats stats = manager.getStats(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index d687d4c..c7f2ae1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -215,6 +215,30 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { "RegionStats-partition-" + pr.getName(), pr.getCachePerfStats(), pr, pr.getCache().getMeterRegistry(), statisticsClock); this.keysOfInterest = new ConcurrentHashMap(); + launchBucketIdsLogThread(); + } + + private void launchBucketIdsLogThread() { + new Thread(() -> logBucketIds()).start(); + } + + private void logBucketIds() { + List<Integer> previousBucketIdsList = Collections.emptyList(), currentBucketIdsList; + while (true) { + currentBucketIdsList = new ArrayList<>(getAllLocalBucketIds()); + Collections.sort(currentBucketIdsList); + if (!currentBucketIdsList.equals(previousBucketIdsList)) { + String message = "region=" + getName() + "; bucketIdsSize=" + currentBucketIdsList.size() + + "; bucketIds=" + currentBucketIdsList; + System.out.println(message); + logger.warn("XXX " + message); + previousBucketIdsList = currentBucketIdsList; + } + try { + Thread.sleep(2000); + } catch (Exception e) { + } + } } /** @@ -793,6 +817,11 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { .setPartitionedRegion(this.partitionedRegion) .setIndexes(getIndexes(rootRegion.getFullPath(), bucketRegionName))); this.partitionedRegion.getPrStats().incBucketCount(1); + System.out.println(Thread.currentThread().getName() + + ": PartitionedRegionDataStore.createBucketRegion region=" + bucketRegion.getName()); + logger.warn( + "XXX PartitionedRegionDataStore.createBucketRegion region=" + bucketRegion.getName()); + // new Exception()); } catch (RegionExistsException ex) { // Bucket Region is already created, so do nothing. if (logger.isDebugEnabled()) { @@ -1644,6 +1673,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { } this.localBucket2RegionMap.remove(Integer.valueOf(bucketId)); this.partitionedRegion.getPrStats().incBucketCount(-1); + System.out.println(Thread.currentThread().getName() + + ": PartitionedRegionDataStore.removeBucket region=" + bucketRegion.getName()); + logger.warn("XXX PartitionedRegionDataStore.removeBucket region=" + bucketRegion.getName()); + // new Exception()); return true; } finally { lock.unlock(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java index 8a6c4f9..b125ae4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java @@ -47,6 +47,27 @@ public class PartitionRebalanceDetailsImpl private final transient PartitionedRegion region; private long time; + @Override + public String toString() { + return "PartitionRebalanceDetailsImpl{" + + "bucketCreateBytes=" + bucketCreateBytes + + ", bucketCreateTime=" + bucketCreateTime + + ", bucketCreatesCompleted=" + bucketCreatesCompleted + + ", bucketRemoveBytes=" + bucketRemoveBytes + + ", bucketRemoveTime=" + bucketRemoveTime + + ", bucketRemovesCompleted=" + bucketRemovesCompleted + + ", bucketTransferBytes=" + bucketTransferBytes + + ", bucketTransferTime=" + bucketTransferTime + + ", bucketTransfersCompleted=" + bucketTransfersCompleted + + ", partitionMemberDetailsAfter=" + partitionMemberDetailsAfter + + ", partitionMemberDetailsBefore=" + partitionMemberDetailsBefore + + ", primaryTransferTime=" + primaryTransferTime + + ", primaryTransfersCompleted=" + primaryTransfersCompleted + + ", region=" + region + + ", time=" + time + + '}'; + } + public PartitionRebalanceDetailsImpl(PartitionedRegion region) { this.region = region; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java index e486e78..fc7b75b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java @@ -528,6 +528,10 @@ public class PartitionedRegionRebalanceOp { removed = getLeaderRegion().getDataStore().removeBucket(bucketId, false); } else { // send message to remote member... + String message = "XXX PartitionedRegionRebalanceOp.removeRedundantBucketForRegion target=" + + target + "; region=" + getLeaderRegion() + "; bucketId=" + bucketId; + System.out.println(Thread.currentThread().getName() + ": " + message); + logger.warn(message, new Exception()); RemoveBucketResponse response = RemoveBucketMessage.send(target, getLeaderRegion(), bucketId, false); if (response != null) {
