GEODE-3049: Add warning on loss of all buckets in region
* Add statistics for count of buckets that have no copies remaining
* Refactor logic for keeping track of redundancy statistics into
separate classes
* Remove dependency of tracking on the PartitionRegionStatistics
(since this can be turned off), but continue recording statistics
to that location
This closes #577
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b7db727a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b7db727a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b7db727a
Branch: refs/heads/feature/GEM-1483
Commit: b7db727ad8477806e7884d16624cd72bd6ba26e0
Parents: 0131292
Author: Nick Reich <[email protected]>
Authored: Mon Jun 12 17:29:16 2017 -0700
Committer: Darrel Schneider <[email protected]>
Committed: Mon Jul 24 17:52:43 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/BucketAdvisor.java | 118 ++--------------
.../internal/cache/BucketRedundancyTracker.java | 122 ++++++++++++++++
.../geode/internal/cache/PartitionedRegion.java | 8 ++
.../PartitionedRegionRedundancyTracker.java | 139 +++++++++++++++++++
.../internal/cache/PartitionedRegionStats.java | 13 ++
.../cache/partitioned/RegionAdvisor.java | 18 ---
.../cache/BucketRedundancyTrackerTest.java | 124 +++++++++++++++++
.../PartitionedRegionRedundancyTrackerTest.java | 120 ++++++++++++++++
8 files changed, 537 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 0c58963..e8fabb7 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -89,6 +89,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
*/
protected final RegionAdvisor regionAdvisor;
+ private final BucketRedundancyTracker redundancyTracker;
+
/**
* The bucket primary will be holding this distributed lock. Protected by
synchronized(this).
*/
@@ -128,25 +130,6 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
static private final Random myRand = new Random();
/**
- * Used by {@link #updateRedundancy()} to determine if stat change is
required. Access and
- * mutation are done while synchronized on this advisor.
- *
- * @guarded.By this
- */
- private boolean redundancySatisfied = true;
-
- /**
- * Used by {@link #incLowRedundancyBucketCount(int)} to determine if
redundancy for this bucket
- * has ever been satisfied. Only buckets which lose redundancy after having
redundancy will
- * generate a redundancy loss alert.
- * <p>
- * Access and mutation are done while synchronized on this advisor.
- *
- * @guarded.By this
- */
- private boolean redundancyEverSatisfied = false;
-
- /**
* A read/write lock to prevent making this bucket not primary while a write
is in progress on the
* bucket.
*/
@@ -160,8 +143,6 @@ public class BucketAdvisor extends CacheDistributionAdvisor
{
*/
private BucketAdvisor parentAdvisor;
- private volatile int redundancy = -1;
-
/**
* The member that is responsible for choosing the primary for this bucket.
While this field is
* set and this member exists, this bucket won't try to become primary.
@@ -188,6 +169,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor
{
super(bucket);
this.regionAdvisor = regionAdvisor;
this.pRegion = this.regionAdvisor.getPartitionedRegion();
+ this.redundancyTracker =
+ new BucketRedundancyTracker(pRegion.getRedundantCopies(),
pRegion.getRedundancyTracker());
resetParentAdvisor(bucket.getId());
}
@@ -365,64 +348,6 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
}
/**
- * Increment or decrement lowRedundancyBucketCount stat and generate alert
only once per loss of
- * redundancy for PR but only if redundancy has ever been satisfied.
- * <p>
- * Caller must synchronize on this BucketAdvisor.
- *
- * @param val the value to increment or decrement by
- * @guarded.By this
- */
- private void incLowRedundancyBucketCount(int val) {
- final int HAS_LOW_REDUNDANCY = 0;
- final int ALREADY_GENERATED_WARNING = 1;
-
- final PartitionedRegionStats stats = getPartitionedRegionStats();
- final boolean[] lowRedundancyFlags =
this.regionAdvisor.getLowRedundancyFlags();
- final int configuredRedundancy = this.pRegion.getRedundantCopies();
-
- synchronized (lowRedundancyFlags) {
- stats.incLowRedundancyBucketCount(val);
-
- if (stats.getLowRedundancyBucketCount() == 0) {
- // all buckets are fully redundant
- lowRedundancyFlags[HAS_LOW_REDUNDANCY] = false; // reset
- lowRedundancyFlags[ALREADY_GENERATED_WARNING] = false; // reset
- stats.setActualRedundantCopies(configuredRedundancy);
- }
-
- else {
- // one or more buckets are not fully redundant
- int numBucketHosts = getBucketRedundancy() + 1;
- int actualRedundancy = Math.max(numBucketHosts - 1, 0); // zero or more
-
- if (actualRedundancy < stats.getActualRedundantCopies()) {
- // need to generate an alert for this lower actual redundancy
- lowRedundancyFlags[ALREADY_GENERATED_WARNING] = false;
- }
-
- if (!lowRedundancyFlags[HAS_LOW_REDUNDANCY]
- || !lowRedundancyFlags[ALREADY_GENERATED_WARNING]) {
- // either we have lower redundancy or we never generated an alert
-
- lowRedundancyFlags[HAS_LOW_REDUNDANCY] = true;
- stats.setActualRedundantCopies(actualRedundancy);
-
- // this bucket will only generate alert if redundancyEverSatisfied
- if (!lowRedundancyFlags[ALREADY_GENERATED_WARNING] &&
this.redundancyEverSatisfied) {
-
- lowRedundancyFlags[ALREADY_GENERATED_WARNING] = true;
- logger.warn(LocalizedMessage.create(
-
LocalizedStrings.BucketAdvisor_REDUNDANCY_HAS_DROPPED_BELOW_0_CONFIGURED_COPIES_TO_1_ACTUAL_COPIES_FOR_2,
- new Object[] {Integer.valueOf(configuredRedundancy),
- Integer.valueOf(actualRedundancy),
this.pRegion.getFullPath()}));
- }
- }
- }
- }
- }
-
- /**
* Return (and possibly choose) a thread-sticky member from whose data store
this bucket's values
* should be read
*
@@ -1007,10 +932,7 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
wasPrimary = isPrimary();
super.close();
this.requestPrimaryState(CLOSED);
- if (!this.redundancySatisfied) {
- incLowRedundancyBucketCount(-1);
- this.redundancySatisfied = true;
- }
+ this.redundancyTracker.closeBucket();
this.localProfile = null;
}
if (wasPrimary) {
@@ -1797,32 +1719,14 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
}
/**
- * Determine if there has been a change in redundancy and alter the
lowRedundancyBucketCount stat
- * as needed.
- *
- * Also updates a counter used to track the redundancy of this member
- *
- * @return current number of hosts for this bucket
- * @see #redundancySatisfied
- * @see PartitionedRegionStats#incLowRedundancyBucketCount(int)
- * @guarded.By this
+ * Get the current number of bucket hosts and update the redundancy
statistics for the region
+ *
+ * @return number of current bucket hosts
*/
private int updateRedundancy() {
- int desiredRedundancy = this.pRegion.getRedundantCopies();
int numBucketHosts = getNumInitializedBuckets();
- if (isClosed()) {
- return numBucketHosts;
- }
- int actualRedundancy = numBucketHosts - 1;
- this.redundancy = actualRedundancy;
- if (this.redundancySatisfied && numBucketHosts > 0 && actualRedundancy <
desiredRedundancy) {
- incLowRedundancyBucketCount(1);
- this.redundancySatisfied = false;
- } else if (!this.redundancySatisfied && numBucketHosts > 0
- && actualRedundancy >= desiredRedundancy) {
- incLowRedundancyBucketCount(-1);
- this.redundancySatisfied = true;
- this.redundancyEverSatisfied = true;
+ if (!isClosed()) {
+ redundancyTracker.updateStatistics(numBucketHosts);
}
return numBucketHosts;
}
@@ -1882,7 +1786,7 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
* @return current number of hosts of this bucket ; -1 if there are no hosts
*/
public int getBucketRedundancy() {
- return redundancy;
+ return redundancyTracker.getCurrentRedundancy();
}
public Set<InternalDistributedMember> adviseInitialized() {
http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java
new file mode 100644
index 0000000..2883744
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+/**
+ * Keeps track of redundancy status for a bucket in a PartitionedRegion and
update the region's
+ * {@link PartitionedRegionRedundancyTracker} of the bucket's status for the
region.
+ */
+class BucketRedundancyTracker {
+ private boolean redundancySatisfied = false;
+ private boolean hasAnyCopies = false;
+ private boolean redundancyEverSatisfied = false;
+ private boolean hasEverHadCopies = false;
+ private volatile int currentRedundancy = -1;
+ private final int targetRedundancy;
+ private final PartitionedRegionRedundancyTracker regionRedundancyTracker;
+
+ /**
+ * Creates a new BucketRedundancyTracker
+ *
+ * @param redundantCopies the number of redundant copies specified for the
+ * {@link PartitionedRegion} of this bucket
+ * @param regionRedundancyTracker the redundancy tracker for the {@link
PartitionedRegion} of this
+ * bucket
+ */
+ BucketRedundancyTracker(int redundantCopies,
+ PartitionedRegionRedundancyTracker regionRedundancyTracker) {
+ this.targetRedundancy = redundantCopies;
+ this.regionRedundancyTracker = regionRedundancyTracker;
+ }
+
+ /**
+ * Adjust statistics based on closing a bucket
+ */
+ synchronized void closeBucket() {
+ if (!redundancySatisfied) {
+ regionRedundancyTracker.decrementLowRedundancyBucketCount();
+ redundancySatisfied = true;
+ }
+ if (hasEverHadCopies && !hasAnyCopies) {
+ regionRedundancyTracker.decrementNoCopiesBucketCount();
+ hasAnyCopies = true;
+ }
+ }
+
+ /**
+ * Determines if there has been a change in current redundancy and updates
statistics on
+ * redundancy for the region of the bucket for this tracker
+ *
+ * @param currentBucketHosts number of current hosts for the bucket
+ */
+ synchronized void updateStatistics(int currentBucketHosts) {
+ updateRedundancyStatistics(currentBucketHosts);
+ updateNoCopiesStatistics(currentBucketHosts);
+ }
+
+ /**
+ * Provides the current redundancy of the bucket for this tracker
+ *
+ * @return number of redundant copies of the bucket for this tracker
+ */
+ int getCurrentRedundancy() {
+ return currentRedundancy;
+ }
+
+ private void updateNoCopiesStatistics(int currentBucketHosts) {
+ if (hasAnyCopies && currentBucketHosts == 0) {
+ hasAnyCopies = false;
+ regionRedundancyTracker.incrementNoCopiesBucketCount();
+ } else if (!hasAnyCopies && currentBucketHosts > 0) {
+ if (hasEverHadCopies) {
+ regionRedundancyTracker.decrementNoCopiesBucketCount();
+ }
+ hasEverHadCopies = true;
+ hasAnyCopies = true;
+ }
+ }
+
+ private void updateRedundancyStatistics(int updatedBucketHosts) {
+ int updatedRedundancy = updatedBucketHosts - 1;
+ updateCurrentRedundancy(updatedRedundancy);
+
+ if (updatedRedundancy < targetRedundancy) {
+ reportUpdatedBucketCount(updatedBucketHosts);
+ if (redundancySatisfied) {
+ regionRedundancyTracker.incrementLowRedundancyBucketCount();
+ redundancySatisfied = false;
+ } else if (!hasAnyCopies && updatedRedundancy >= 0) {
+ regionRedundancyTracker.incrementLowRedundancyBucketCount();
+ }
+ } else if (!redundancySatisfied && updatedRedundancy == targetRedundancy) {
+ regionRedundancyTracker.decrementLowRedundancyBucketCount();
+ redundancySatisfied = true;
+ redundancyEverSatisfied = true;
+ }
+ }
+
+ private void updateCurrentRedundancy(int updatedRedundancy) {
+ if (updatedRedundancy != currentRedundancy) {
+ regionRedundancyTracker.setActualRedundancy(updatedRedundancy);
+ currentRedundancy = updatedRedundancy;
+ }
+ }
+
+ private void reportUpdatedBucketCount(int updatedBucketHosts) {
+ if (redundancyEverSatisfied) {
+ regionRedundancyTracker.reportBucketCount(updatedBucketHosts);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 27b442d..8410c00 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -569,6 +569,10 @@ public class PartitionedRegion extends LocalRegion
}
};
+ PartitionedRegionRedundancyTracker getRedundancyTracker() {
+ return redundancyTracker;
+ }
+
public static class PRIdMap extends HashMap {
private static final long serialVersionUID = 3667357372967498179L;
@@ -713,6 +717,8 @@ public class PartitionedRegion extends LocalRegion
private AbstractGatewaySender parallelGatewaySender = null;
+ private final PartitionedRegionRedundancyTracker redundancyTracker;
+
/**
* Constructor for a PartitionedRegion. This has an accessor (Region API)
functionality and
* contains a datastore for actual storage. An accessor can act as a local
cache by having a local
@@ -754,6 +760,8 @@ public class PartitionedRegion extends LocalRegion
// getScope is overridden to return the correct scope.
// this.scope = Scope.LOCAL;
this.redundantCopies =
regionAttributes.getPartitionAttributes().getRedundantCopies();
+ this.redundancyTracker = new
PartitionedRegionRedundancyTracker(this.totalNumberOfBuckets,
+ this.redundantCopies, this.prStats, getFullPath());
this.prStats.setConfiguredRedundantCopies(
regionAttributes.getPartitionAttributes().getRedundantCopies());
this.prStats.setLocalMaxMemory(
http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTracker.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTracker.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTracker.java
new file mode 100644
index 0000000..38ef61b
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTracker.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Keeps track redundancy statistics across the buckets of a given {@link
PartitionedRegion}
+ */
+class PartitionedRegionRedundancyTracker {
+ private static final Logger logger = LogService.getLogger();
+
+ private final PartitionedRegionStats stats;
+ private final String regionPath;
+ private final int totalBuckets;
+ private final int targetRedundancy;
+
+ private int lowRedundancyBuckets;
+ private int noCopiesBuckets;
+ private int lowestBucketCopies;
+
+ /**
+ * Creates a new PartitionedRegionRedundancyTracker
+ *
+ * @param totalBuckets number of buckets in the region to track
+ * @param redundantCopies number of redundant copies specified on the region
to track
+ * @param stats the statistics container for the region to track
+ * @param regionPath the full path of the region to track
+ */
+ PartitionedRegionRedundancyTracker(int totalBuckets, int redundantCopies,
+ PartitionedRegionStats stats, String regionPath) {
+ this.stats = stats;
+ this.regionPath = regionPath;
+ this.totalBuckets = totalBuckets;
+ this.targetRedundancy = redundantCopies;
+ this.lowestBucketCopies = redundantCopies + 1;
+ }
+
+ /**
+ * Since consistency was last reached, provides the lowest number of copies
of a bucket that have
+ * been remaining across all the buckets in the region
+ *
+ * @return the number of copies of the bucket with the least copies available
+ */
+ int getLowestBucketCopies() {
+ return lowestBucketCopies;
+ }
+
+ /**
+ * Increments the count of buckets that do not meet redundancy
+ */
+ synchronized void incrementLowRedundancyBucketCount() {
+ if (lowRedundancyBuckets == totalBuckets) {
+ return;
+ }
+ lowRedundancyBuckets++;
+ stats.incLowRedundancyBucketCount(1);
+ }
+
+ /**
+ * Updates the count of copies for the bucket with the least copies if a new
low has been reached
+ *
+ * @param bucketCopies number of copies of a bucket remaining
+ */
+ synchronized void reportBucketCount(int bucketCopies) {
+ if (bucketCopies < lowestBucketCopies) {
+ lowestBucketCopies = bucketCopies;
+ logger.warn(LocalizedMessage.create(
+
LocalizedStrings.BucketAdvisor_REDUNDANCY_HAS_DROPPED_BELOW_0_CONFIGURED_COPIES_TO_1_ACTUAL_COPIES_FOR_2,
+ new Object[] {targetRedundancy + 1, bucketCopies, regionPath}));
+ }
+ }
+
+ /**
+ * Increments the count of buckets that no longer have any copies remaining
+ */
+ synchronized void incrementNoCopiesBucketCount() {
+ if (noCopiesBuckets == totalBuckets) {
+ return;
+ }
+ noCopiesBuckets++;
+ stats.incNoCopiesBucketCount(1);
+ if (noCopiesBuckets == 1) {
+ logger.warn("All in memory copies of some data have been lost for " +
regionPath);
+ }
+ }
+
+ /**
+ * Decrements the count of buckets that do not meet redundancy
+ */
+ synchronized void decrementLowRedundancyBucketCount() {
+ if (lowRedundancyBuckets == 0) {
+ return;
+ }
+ lowRedundancyBuckets--;
+ stats.incLowRedundancyBucketCount(-1);
+ if (lowRedundancyBuckets == 0) {
+ lowestBucketCopies = targetRedundancy + 1;
+ logger.info("Configured redundancy of " + (targetRedundancy + 1)
+ + " copies has been restored to " + regionPath);
+ }
+ }
+
+ /**
+ * Decrements the count of buckets that no longer have any copies remaining
+ */
+ synchronized void decrementNoCopiesBucketCount() {
+ if (noCopiesBuckets == 0) {
+ return;
+ }
+ noCopiesBuckets--;
+ stats.incNoCopiesBucketCount(-1);
+ // if the last bucket with no copies has gained a copy, the bucket with
the lowest
+ // number of copies (that bucket) has one copy
+ if (noCopiesBuckets == 0) {
+ lowestBucketCopies = 1;
+ }
+ }
+
+ void setActualRedundancy(int actualRedundancy) {
+ stats.setActualRedundantCopies(actualRedundancy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java
index d1dc1be..535e877 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java
@@ -138,6 +138,8 @@ public class PartitionedRegionStats {
private final static int volunteeringThreadsId; // number of threads
actively volunteering
private final static int lowRedundancyBucketCountId; // number of buckets
currently without full
// redundancy
+ private final static int noCopiesBucketCountId; // number of buckets
currently without any
+ // redundancy
private final static int configuredRedundantCopiesId;
private final static int actualRedundantCopiesId;
@@ -308,6 +310,8 @@ public class PartitionedRegionStats {
"Current number of threads volunteering for primary.",
"threads"),
f.createIntGauge("lowRedundancyBucketCount",
"Current number of buckets without full redundancy.",
"buckets"),
+ f.createIntGauge("noCopiesBucketCount",
+ "Current number of buckets without any copies remaining.",
"buckets"),
f.createIntGauge("configuredRedundantCopies",
"Configured number of redundant copies for this partitioned
region.", "copies"),
f.createIntGauge("actualRedundantCopies",
@@ -477,6 +481,7 @@ public class PartitionedRegionStats {
primaryBucketCountId = type.nameToId("primaryBucketCount");
volunteeringThreadsId = type.nameToId("volunteeringThreads");
lowRedundancyBucketCountId = type.nameToId("lowRedundancyBucketCount");
+ noCopiesBucketCountId = type.nameToId("noCopiesBucketCount");
getEntriesCompletedId = type.nameToId("getEntryCompleted");
getEntryTimeId = type.nameToId("getEntryTime");
@@ -899,10 +904,18 @@ public class PartitionedRegionStats {
return this.stats.getInt(lowRedundancyBucketCountId);
}
+ public int getNoCopiesBucketCount() {
+ return this.stats.getInt(noCopiesBucketCountId);
+ }
+
public void incLowRedundancyBucketCount(int val) {
this.stats.incInt(lowRedundancyBucketCountId, val);
}
+ public void incNoCopiesBucketCount(int val) {
+ this.stats.incInt(noCopiesBucketCountId, val);
+ }
+
public int getConfiguredRedundantCopies() {
return this.stats.getInt(configuredRedundantCopiesId);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
index c473d2d..63a694a 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
@@ -80,26 +80,8 @@ public class RegionAdvisor extends CacheDistributionAdvisor {
private Queue preInitQueue;
private final Object preInitQueueMonitor = new Object();
- /**
- * Used by to generate redundancy loss alert only once even if more than one
bucket or PR has lost
- * redundancy. lowRedundancyFlags[0] is true if any bucket in this
partitioned region has lower
- * than configured redundancy. lowRedundancyFlags[1] is true if a warning
has been generated for
- * the current actual redundancy of this partitioned region. The caller must
synchronize on
- * lowRedundancyFlags in order to maintain atomicity of overall redundancy
status and alert.
- */
- private final boolean[] lowRedundancyFlags = new boolean[2];
-
private ConcurrentHashMap<Integer, Set<ServerBucketProfile>>
clientBucketProfilesMap;
- /**
- * Caller must synchronize on the return value.
- *
- * @return the low redundancy flags for this partitioned region
- */
- public boolean[] getLowRedundancyFlags() {
- return lowRedundancyFlags;
- }
-
private RegionAdvisor(PartitionedRegion region) {
super(region);
synchronized (this.preInitQueueMonitor) {
http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java
new file mode 100644
index 0000000..080cec1
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(UnitTest.class)
+public class BucketRedundancyTrackerTest {
+ private static final int TARGET_COPIES = 2;
+
+ private PartitionedRegionRedundancyTracker regionRedundancyTracker;
+ private BucketRedundancyTracker bucketRedundancyTracker;
+
+ @Before
+ public void setup() {
+ regionRedundancyTracker = mock(PartitionedRegionRedundancyTracker.class);
+ bucketRedundancyTracker =
+ new BucketRedundancyTracker(TARGET_COPIES - 1,
regionRedundancyTracker);
+ }
+
+ @Test
+ public void whenRedundancyNeverMetDoesNotWarnOnLowRedundancy() {
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1);
+ verify(regionRedundancyTracker, never()).reportBucketCount(anyInt());
+ }
+
+ @Test
+ public void incrementsBucketCountOnLowRedundancyForBucket() {
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES);
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1);
+ verify(regionRedundancyTracker,
times(1)).incrementLowRedundancyBucketCount();
+ assertEquals(0, bucketRedundancyTracker.getCurrentRedundancy());
+ }
+
+ @Test
+ public void decrementsBucketCountOnRegainingRedundancyForBucket() {
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES);
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1);
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES);
+ verify(regionRedundancyTracker,
times(2)).decrementLowRedundancyBucketCount();
+ assertEquals(TARGET_COPIES - 1,
bucketRedundancyTracker.getCurrentRedundancy());
+ }
+
+ @Test
+ public void decrementsBucketCountOnClosingBucketBelowRedundancy() {
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES);
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1);
+ bucketRedundancyTracker.closeBucket();
+ verify(regionRedundancyTracker,
times(2)).decrementLowRedundancyBucketCount();
+ assertEquals(0, bucketRedundancyTracker.getCurrentRedundancy());
+ }
+
+ @Test
+ public void decrementsBucketCountOnClosingABucketWithNoCopies() {
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES);
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1);
+ bucketRedundancyTracker.updateStatistics(0);
+ bucketRedundancyTracker.closeBucket();
+ verify(regionRedundancyTracker,
times(2)).decrementLowRedundancyBucketCount();
+ assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy());
+ }
+
+ @Test
+ public void bucketCountNotDecrementedOnClosingBucketThatNeverHadCopies() {
+ verify(regionRedundancyTracker,
never()).decrementLowRedundancyBucketCount();
+ assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy());
+ }
+
+ @Test
+ public void doesNotWarnWhenNeverHadAnyCopies() {
+ bucketRedundancyTracker.updateStatistics(0);
+ verify(regionRedundancyTracker, never()).reportBucketCount(anyInt());
+ assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy());
+ }
+
+ @Test
+ public void incrementsBucketCountOnHavingNoCopiesForBucket() {
+ bucketRedundancyTracker.updateStatistics(1);
+ bucketRedundancyTracker.updateStatistics(0);
+ verify(regionRedundancyTracker, times(1)).incrementNoCopiesBucketCount();
+ assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy());
+ }
+
+ @Test
+ public void decrementsBucketCountOnHavingAtLeastOneCopyOfBucket() {
+ bucketRedundancyTracker.updateStatistics(1);
+ bucketRedundancyTracker.updateStatistics(0);
+ bucketRedundancyTracker.updateStatistics(1);
+ verify(regionRedundancyTracker, times(1)).decrementNoCopiesBucketCount();
+ assertEquals(0, bucketRedundancyTracker.getCurrentRedundancy());
+ }
+
+ @Test
+ public void updatesRedundancyOnlyIfChanged() {
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1);
+ verify(regionRedundancyTracker,
times(1)).setActualRedundancy(TARGET_COPIES - 2);
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES);
+ verify(regionRedundancyTracker,
times(1)).setActualRedundancy(TARGET_COPIES - 1);
+ bucketRedundancyTracker.updateStatistics(TARGET_COPIES);
+ verify(regionRedundancyTracker, times(2)).setActualRedundancy(anyInt());
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTrackerTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTrackerTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTrackerTest.java
new file mode 100644
index 0000000..0917835
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTrackerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(UnitTest.class)
+public class PartitionedRegionRedundancyTrackerTest {
+ private static final int TARGET_COPIES = 2;
+ private static final int TOTAL_BUCKETS = 3;
+
+ private PartitionedRegionStats stats;
+ private PartitionedRegionRedundancyTracker redundancyTracker;
+
+ @Before
+ public void setup() {
+ stats = mock(PartitionedRegionStats.class);
+ redundancyTracker = new PartitionedRegionRedundancyTracker(TOTAL_BUCKETS,
TARGET_COPIES - 1,
+ stats, "testRegion");
+ }
+
+ @Test
+ public void incrementsAndDecrementsLowRedundancyBucketCount() {
+ redundancyTracker.incrementLowRedundancyBucketCount();
+ verify(stats, times(1)).incLowRedundancyBucketCount(1);
+ redundancyTracker.decrementLowRedundancyBucketCount();
+ verify(stats, times(1)).incLowRedundancyBucketCount(-1);
+ }
+
+ @Test
+ public void willNotIncrementLowRedundancyBucketCountBeyondTotalsBuckets() {
+
+ for (int i = 0; i < TOTAL_BUCKETS; i++) {
+ redundancyTracker.incrementLowRedundancyBucketCount();
+ }
+ verify(stats, times(TOTAL_BUCKETS)).incLowRedundancyBucketCount(1);
+ redundancyTracker.incrementLowRedundancyBucketCount();
+ verifyNoMoreInteractions(stats);
+ }
+
+ @Test
+ public void willNotDecrementLowRedundancyBucketCountBelowZero() {
+ redundancyTracker.decrementLowRedundancyBucketCount();
+ verifyZeroInteractions(stats);
+ }
+
+ @Test
+ public void incrementsAndDecrementsNoCopiesBucketCount() {
+ redundancyTracker.incrementNoCopiesBucketCount();
+ verify(stats, times(1)).incNoCopiesBucketCount(1);
+ redundancyTracker.decrementNoCopiesBucketCount();
+ verify(stats, times(1)).incNoCopiesBucketCount(-1);
+ }
+
+ @Test
+ public void willNotIncrementNoCopiesBucketCountBeyondTotalsBuckets() {
+
+ for (int i = 0; i < TOTAL_BUCKETS; i++) {
+ redundancyTracker.incrementNoCopiesBucketCount();
+ }
+ verify(stats, times(TOTAL_BUCKETS)).incNoCopiesBucketCount(1);
+ redundancyTracker.incrementNoCopiesBucketCount();
+ verifyNoMoreInteractions(stats);
+ }
+
+ @Test
+ public void willNotDecrementNoCopiesBucketCountBelowZero() {
+ redundancyTracker.decrementNoCopiesBucketCount();
+ verify(stats, times(0)).incNoCopiesBucketCount(-1);
+ }
+
+ @Test
+ public void reportsCorrectLowestBucketCopies() {
+ redundancyTracker.reportBucketCount(1);
+ assertEquals(1, redundancyTracker.getLowestBucketCopies());
+ redundancyTracker.reportBucketCount(0);
+ assertEquals(0, redundancyTracker.getLowestBucketCopies());
+ redundancyTracker.reportBucketCount(1);
+ assertEquals(0, redundancyTracker.getLowestBucketCopies());
+ }
+
+ @Test
+ public void lowestBucketCopiesResetsOnRedundancyRegained() {
+ redundancyTracker.incrementLowRedundancyBucketCount();
+ redundancyTracker.reportBucketCount(1);
+ redundancyTracker.decrementLowRedundancyBucketCount();
+ assertEquals(2, redundancyTracker.getLowestBucketCopies());
+ }
+
+ @Test
+ public void lowestBucketCopiesSetToOneOnHavingABucketAgain() {
+ redundancyTracker.incrementNoCopiesBucketCount();
+ redundancyTracker.reportBucketCount(0);
+ redundancyTracker.decrementNoCopiesBucketCount();
+ assertEquals(1, redundancyTracker.getLowestBucketCopies());
+ }
+}