This is an automated email from the ASF dual-hosted git repository.
nreich pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a6092e9 Feature/geode 4810: Move rebalance model classes to own
package (#1623)
a6092e9 is described below
commit a6092e92b985da250c9005964c650f39e0ea579a
Author: Nick Reich <[email protected]>
AuthorDate: Tue Mar 20 13:22:30 2018 -0700
Feature/geode 4810: Move rebalance model classes to own package (#1623)
---
.../partitioned/PartitionedRegionRebalanceOp.java | 4 +-
.../partitioned/rebalance/CompositeDirector.java | 2 +
.../rebalance/ExplicitMoveDirector.java | 11 +-
.../cache/partitioned/rebalance/FPRDirector.java | 2 +
.../cache/partitioned/rebalance/MoveBuckets.java | 3 +-
.../cache/partitioned/rebalance/MovePrimaries.java | 3 +-
.../partitioned/rebalance/MovePrimariesFPR.java | 7 +-
.../rebalance/PercentageMoveDirector.java | 7 +-
.../partitioned/rebalance/RebalanceDirector.java | 1 +
.../rebalance/RebalanceDirectorAdapter.java | 2 +
.../rebalance/RemoveOverRedundancy.java | 7 +-
.../partitioned/rebalance/SatisfyRedundancy.java | 5 +-
.../rebalance/SatisfyRedundancyFPR.java | 7 +-
.../AddressComparor.java} | 24 +-
.../cache/partitioned/rebalance/model/Bucket.java | 160 ++++
.../partitioned/rebalance/model/BucketRollup.java | 104 +++
.../cache/partitioned/rebalance/model/Member.java | 239 ++++++
.../partitioned/rebalance/model/MemberRollup.java | 159 ++++
.../cache/partitioned/rebalance/model/Move.java | 74 ++
.../{ => model}/PartitionedRegionLoadModel.java | 812 +--------------------
.../partitioned/rebalance/model/RefusalReason.java | 49 ++
.../PartitionedRegionLoadModelJUnitTest.java | 11 +-
22 files changed, 874 insertions(+), 819 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 98b8cb7..21630da 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -49,10 +49,10 @@ import
org.apache.geode.internal.cache.partitioned.rebalance.BucketOperator;
import
org.apache.geode.internal.cache.partitioned.rebalance.BucketOperatorImpl;
import
org.apache.geode.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
import
org.apache.geode.internal.cache.partitioned.rebalance.ParallelBucketOperator;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor;
import org.apache.geode.internal.cache.partitioned.rebalance.RebalanceDirector;
import
org.apache.geode.internal.cache.partitioned.rebalance.SimulatedBucketOperator;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.AddressComparor;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/CompositeDirector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/CompositeDirector.java
index f9bdc67..8c66775 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/CompositeDirector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/CompositeDirector.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.internal.cache.partitioned.rebalance;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
+
/**
* The composite director performs a complete rebalance, which can remove over
redundant buckets,
* satisfy redundancy, move buckets, and move primaries.
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/ExplicitMoveDirector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/ExplicitMoveDirector.java
index 2a22289..dea0285 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/ExplicitMoveDirector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/ExplicitMoveDirector.java
@@ -20,10 +20,11 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Bucket;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Member;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Move;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.RefusalReason;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Bucket;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Member;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.RefusalReason;
import org.apache.geode.internal.i18n.LocalizedStrings;
public class ExplicitMoveDirector extends RebalanceDirectorAdapter {
@@ -103,7 +104,7 @@ public class ExplicitMoveDirector extends
RebalanceDirectorAdapter {
}
} else {
throw new IllegalStateException("Unable to move bucket for " +
model.getName() + ". "
- + reason.formatMessage(sourceMember, targetMember, bucket));
+ + reason.formatMessage(targetMember, bucket));
}
return false;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/FPRDirector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/FPRDirector.java
index 040f75c..0958915 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/FPRDirector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/FPRDirector.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.internal.cache.partitioned.rebalance;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
+
/**
* The FPR director performs rebalancing operations for a fixed partitioned
region. There two things
* a fixed partitioned region does during rebalancing: - Create redundant
buckets in a known
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MoveBuckets.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MoveBuckets.java
index 9ce63fb..0a0ea5a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MoveBuckets.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MoveBuckets.java
@@ -14,7 +14,8 @@
*/
package org.apache.geode.internal.cache.partitioned.rebalance;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Move;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
/**
* A director to move buckets to improve the load balance of a PR. This is
most commonly used as an
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MovePrimaries.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MovePrimaries.java
index 119de08..4dfcffa 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MovePrimaries.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MovePrimaries.java
@@ -14,7 +14,8 @@
*/
package org.apache.geode.internal.cache.partitioned.rebalance;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Move;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
/**
* A director to move primaries to improve the load balance of a PR. This is
most commonly used as
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MovePrimariesFPR.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MovePrimariesFPR.java
index fb1d9f4..c4c6365 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MovePrimariesFPR.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/MovePrimariesFPR.java
@@ -22,9 +22,10 @@ import
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.FixedPartitionAttributesImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Bucket;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Member;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Move;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Bucket;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Member;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
import org.apache.geode.internal.logging.LogService;
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/PercentageMoveDirector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/PercentageMoveDirector.java
index 020e2d4..102fad3 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/PercentageMoveDirector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/PercentageMoveDirector.java
@@ -21,9 +21,10 @@ import java.util.TreeSet;
import org.apache.geode.distributed.DistributedMember;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Bucket;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Member;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Move;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Bucket;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Member;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
import org.apache.geode.internal.i18n.LocalizedStrings;
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirector.java
index 10cbe87..924b7f3 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirector.java
@@ -15,6 +15,7 @@
package org.apache.geode.internal.cache.partitioned.rebalance;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
/**
* A class that is responsible for directing all or part of a rebalance
operation. The director
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirectorAdapter.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirectorAdapter.java
index 0e654ac..5f76784 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirectorAdapter.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirectorAdapter.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.internal.cache.partitioned.rebalance;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
+
/**
* A base class for rebalance directors that provides some default
implementations of methods on
* rebalance director.
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RemoveOverRedundancy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RemoveOverRedundancy.java
index a84b0ac..c3a8bfa 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RemoveOverRedundancy.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RemoveOverRedundancy.java
@@ -16,9 +16,10 @@ package
org.apache.geode.internal.cache.partitioned.rebalance;
import org.apache.logging.log4j.Logger;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.BucketRollup;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Member;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.BucketRollup;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Member;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
import org.apache.geode.internal.logging.LogService;
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
index 3465abb..671100a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
@@ -16,8 +16,9 @@ package org.apache.geode.internal.cache.partitioned.rebalance;
import org.apache.logging.log4j.Logger;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.BucketRollup;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.BucketRollup;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
import org.apache.geode.internal.logging.LogService;
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
index 254defb..eb68276 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
@@ -19,9 +19,10 @@ import java.util.Map;
import org.apache.logging.log4j.Logger;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.BucketRollup;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Member;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.BucketRollup;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Member;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Move;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
import org.apache.geode.internal.logging.LogService;
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirectorAdapter.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/AddressComparor.java
similarity index 61%
copy from
geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirectorAdapter.java
copy to
geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/AddressComparor.java
index 0e654ac..2acbba5 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/RebalanceDirectorAdapter.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/AddressComparor.java
@@ -12,24 +12,16 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.apache.geode.internal.cache.partitioned.rebalance;
+package org.apache.geode.internal.cache.partitioned.rebalance.model;
-/**
- * A base class for rebalance directors that provides some default
implementations of methods on
- * rebalance director.
- *
- *
- */
-public abstract class RebalanceDirectorAdapter implements RebalanceDirector {
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
- @Override
- public boolean isRebalanceNecessary(boolean redundancyImpaired, boolean
withPersistence) {
- return true;
- }
+public interface AddressComparor {
- @Override
- public void initialize(PartitionedRegionLoadModel model) {
- membershipChanged(model);
+ boolean enforceUniqueZones();
- }
+ /**
+ * Return true if the two addresses are equivalent
+ */
+ boolean areSameZone(InternalDistributedMember member1,
InternalDistributedMember member2);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Bucket.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Bucket.java
new file mode 100644
index 0000000..458acc6
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Bucket.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned.rebalance.model;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+
+/**
+ * Represents a single bucket.
+ */
+public class Bucket implements Comparable<Bucket> {
+ private final int id;
+ private final Set<Member> membersHosting = new TreeSet<>();
+ private final Set<PersistentMemberID> offlineMembers;
+ private float load;
+ private long bytes;
+ private float primaryLoad;
+ private int redundancy = -1;
+ private Member primary;
+
+ public Bucket(int id) {
+ this(id, 0, 0, new HashSet<>());
+ }
+
+ public Bucket(int id, float load, long bytes, Set<PersistentMemberID>
offlineMembers) {
+ this.id = id;
+ this.load = load;
+ this.bytes = bytes;
+ this.offlineMembers = offlineMembers;
+ }
+
+ void changeLoad(float change) {
+ load += change;
+ }
+
+ void changePrimaryLoad(float change) {
+ primaryLoad += change;
+ }
+
+ void changeBytes(long change) {
+ bytes += change;
+ }
+
+ void addOfflineMembers(Collection<? extends PersistentMemberID> members) {
+ offlineMembers.addAll(members);
+ }
+
+ public void setPrimary(Member member, float primaryLoad) {
+ if (this.primary == PartitionedRegionLoadModel.INVALID_MEMBER) {
+ return;
+ }
+ if (this.primary != null) {
+ this.primary.removePrimary(this);
+ }
+ this.primary = member;
+ this.primaryLoad = primaryLoad;
+ if (primary != PartitionedRegionLoadModel.INVALID_MEMBER && primary !=
null) {
+ addMember(primary);
+ member.addPrimary(this);
+ }
+ }
+
+ public boolean addMember(Member targetMember) {
+ if (this.getMembersHosting().add(targetMember)) {
+ this.redundancy++;
+ targetMember.addBucket(this);
+ return true;
+ }
+
+ return false;
+ }
+
+ public boolean removeMember(Member targetMember) {
+ if (this.getMembersHosting().remove(targetMember)) {
+ if (targetMember == this.primary) {
+ setPrimary(null, 0);
+ }
+ this.redundancy--;
+ targetMember.removeBucket(this);
+ return true;
+ }
+ return false;
+ }
+
+ public int getRedundancy() {
+ return this.redundancy + offlineMembers.size();
+ }
+
+ public int getOnlineRedundancy() {
+ return this.redundancy;
+ }
+
+ public float getLoad() {
+ return this.load;
+ }
+
+ public int getId() {
+ return this.id;
+ }
+
+ public long getBytes() {
+ return this.bytes;
+ }
+
+ @Override
+ public String toString() {
+ return "Bucket(id=" + getId() + ",load=" + load + ")";
+ }
+
+ public float getPrimaryLoad() {
+ return this.primaryLoad;
+ }
+
+ public Set<Member> getMembersHosting() {
+ return this.membersHosting;
+ }
+
+ public Member getPrimary() {
+ return this.primary;
+ }
+
+ public Collection<? extends PersistentMemberID> getOfflineMembers() {
+ return offlineMembers;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.id;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Bucket)) {
+ return false;
+ }
+ Bucket o = (Bucket) other;
+ return this.id == o.id;
+ }
+
+ @Override
+ public int compareTo(Bucket other) {
+ return Integer.compare(this.id, other.id);
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/BucketRollup.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/BucketRollup.java
new file mode 100644
index 0000000..9201255
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/BucketRollup.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned.rebalance.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents the sum of all of colocated buckets with a given bucket id.
+ */
+public class BucketRollup extends Bucket {
+ private final Map<String, Bucket> colocatedBuckets = new HashMap<>();
+
+ BucketRollup(int id) {
+ super(id);
+ }
+
+ void addColocatedBucket(String region, Bucket b) {
+ if (!this.getColocatedBuckets().containsKey(region)) {
+ this.getColocatedBuckets().put(region, b);
+ changeLoad(b.getLoad());
+ changePrimaryLoad(b.getPrimaryLoad());
+ changeBytes(b.getBytes());
+ addOfflineMembers(b.getOfflineMembers());
+
+ // Update the load on the members hosting this bucket
+ // to reflect the fact that the bucket is larger now.
+ for (Member member : getMembersHosting()) {
+ MemberRollup rollup = (MemberRollup) member;
+ float primaryLoad = 0;
+ if (this.getPrimary() == member) {
+ primaryLoad = b.getPrimaryLoad();
+ }
+ rollup.updateLoad(b.getLoad(), primaryLoad, b.getBytes());
+ }
+ }
+ }
+
+ @Override
+ public boolean addMember(Member targetMember) {
+ if (super.addMember(targetMember)) {
+ MemberRollup memberRollup = (MemberRollup) targetMember;
+ for (Map.Entry<String, Bucket> entry : getColocatedBuckets().entrySet())
{
+ String region = entry.getKey();
+ Bucket bucket = entry.getValue();
+ Member member = memberRollup.getColocatedMembers().get(region);
+ if (member != null) {
+ bucket.addMember(member);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean removeMember(Member targetMember) {
+ if (super.removeMember(targetMember)) {
+ MemberRollup memberRollup = (MemberRollup) targetMember;
+ for (Map.Entry<String, Bucket> entry : getColocatedBuckets().entrySet())
{
+ String region = entry.getKey();
+ Bucket bucket = entry.getValue();
+ Member member = memberRollup.getColocatedMembers().get(region);
+ if (member != null) {
+ bucket.removeMember(member);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void setPrimary(Member targetMember, float primaryLoad) {
+ super.setPrimary(targetMember, primaryLoad);
+ if (targetMember != null) {
+ MemberRollup memberRollup = (MemberRollup) targetMember;
+ for (Map.Entry<String, Bucket> entry : getColocatedBuckets().entrySet())
{
+ String region = entry.getKey();
+ Bucket bucket = entry.getValue();
+ Member member = memberRollup.getColocatedMembers().get(region);
+ if (member != null) {
+ bucket.setPrimary(member, primaryLoad);
+ }
+ }
+ }
+ }
+
+ Map<String, Bucket> getColocatedBuckets() {
+ return this.colocatedBuckets;
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java
new file mode 100644
index 0000000..f8f8be0
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned.rebalance.model;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.logging.log4j.Logger;
+
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * Represents a single member of the distributed system.
+ */
+public class Member implements Comparable<Member> {
+ private static final Logger logger = LogService.getLogger();
+
+ private final AddressComparor addressComparor;
+ private final InternalDistributedMember memberId;
+ protected float weight;
+ private float totalLoad;
+ private float totalPrimaryLoad;
+ private long totalBytes;
+ private long localMaxMemory;
+ private final Set<Bucket> buckets = new TreeSet<>();
+ private final Set<Bucket> primaryBuckets = new TreeSet<>();
+ private final boolean isCritical;
+ private final boolean enforceLocalMaxMemory;
+
+ Member(AddressComparor addressComparor, InternalDistributedMember memberId,
boolean isCritical,
+ boolean enforceLocalMaxMemory) {
+ this.addressComparor = addressComparor;
+ this.memberId = memberId;
+ this.isCritical = isCritical;
+ this.enforceLocalMaxMemory = enforceLocalMaxMemory;
+ }
+
+ Member(AddressComparor addressComparor, InternalDistributedMember memberId,
float weight,
+ long localMaxMemory, boolean isCritical, boolean enforceLocalMaxMemory) {
+ this(addressComparor, memberId, isCritical, enforceLocalMaxMemory);
+ this.weight = weight;
+ this.localMaxMemory = localMaxMemory;
+ }
+
+ /**
+ * @param bucket
+ * @param sourceMember the member we will be moving this bucket off of
+ * @param checkZone true if we should not put two copies of a bucket on two
nodes with the same
+ * IP address.
+ */
+ public RefusalReason willAcceptBucket(Bucket bucket, Member sourceMember,
boolean checkZone) {
+ // make sure this member is not already hosting this bucket
+ if (getBuckets().contains(bucket)) {
+ return RefusalReason.ALREADY_HOSTING;
+ }
+ // Check the ip address
+ if (checkZone) {
+ // If the source member is equivalent to the target member, go
+ // ahead and allow the bucket move (it's not making our redundancy
worse).
+ // TODO we could have some logic to prefer moving to different ip
addresses
+ // Probably that logic should be another stage after redundancy
recovery, like
+ // improveRedundancy.
+ boolean sourceIsEquivalent = sourceMember != null
+ && addressComparor.areSameZone(getMemberId(),
sourceMember.getDistributedMember());
+ if (sourceMember == null || !sourceIsEquivalent) {
+ for (Member hostingMember : bucket.getMembersHosting()) {
+ if ((!hostingMember.equals(sourceMember) ||
addressComparor.enforceUniqueZones())
+ && addressComparor.areSameZone(getMemberId(),
hostingMember.getDistributedMember())) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Member {} would prefer not to host {} because it is already
on another member with the same redundancy zone",
+ this, bucket);
+ }
+ return RefusalReason.SAME_ZONE;
+ }
+ }
+ }
+ }
+
+ // check the localMaxMemory
+ if (this.enforceLocalMaxMemory && this.totalBytes + bucket.getBytes() >
this.localMaxMemory) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Member {} won't host bucket {} because it doesn't have
enough space", this,
+ bucket);
+ }
+ return RefusalReason.LOCAL_MAX_MEMORY_FULL;
+ }
+
+ // check to see if the heap is critical
+ if (isCritical) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Member {} won't host bucket {} because it's heap is
critical", this, bucket);
+ }
+ return RefusalReason.CRITICAL_HEAP;
+ }
+
+ return RefusalReason.NONE;
+ }
+
+ public boolean addBucket(Bucket bucket) {
+ if (getBuckets().add(bucket)) {
+ bucket.addMember(this);
+ this.totalBytes += bucket.getBytes();
+ this.totalLoad += bucket.getLoad();
+ return true;
+ }
+ return false;
+ }
+
+ public boolean removeBucket(Bucket bucket) {
+ if (getBuckets().remove(bucket)) {
+ bucket.removeMember(this);
+ this.totalBytes -= bucket.getBytes();
+ this.totalLoad -= bucket.getLoad();
+ return true;
+ }
+ return false;
+ }
+
+ public boolean removePrimary(Bucket bucket) {
+ if (getPrimaryBuckets().remove(bucket)) {
+ this.totalPrimaryLoad -= bucket.getPrimaryLoad();
+ return true;
+ }
+ return false;
+ }
+
+ public boolean addPrimary(Bucket bucket) {
+ if (getPrimaryBuckets().add(bucket)) {
+ this.totalPrimaryLoad += bucket.getPrimaryLoad();
+ return true;
+ }
+ return false;
+ }
+
+ public int getBucketCount() {
+ return getBuckets().size();
+ }
+
+ public long getConfiguredMaxMemory() {
+ return this.localMaxMemory;
+ }
+
+ public InternalDistributedMember getDistributedMember() {
+ return getMemberId();
+ }
+
+ public int getPrimaryCount() {
+ int primaryCount = 0;
+ for (Bucket bucket : getBuckets()) {
+ if (this.equals(bucket.getPrimary())) {
+ primaryCount++;
+ }
+ }
+ return primaryCount;
+ }
+
+ public long getSize() {
+ return this.totalBytes;
+ }
+
+ public float getTotalLoad() {
+ return this.totalLoad;
+ }
+
+ public float getWeight() {
+ return this.weight;
+ }
+
+ @Override
+ public String toString() {
+ return "Member(id=" + getMemberId() + ")";
+ }
+
+ public float getPrimaryLoad() {
+ return this.totalPrimaryLoad;
+ }
+
+ public Set<Bucket> getBuckets() {
+ return this.buckets;
+ }
+
+ InternalDistributedMember getMemberId() {
+ return this.memberId;
+ }
+
+ Set<Bucket> getPrimaryBuckets() {
+ return this.primaryBuckets;
+ }
+
+ void changeLocalMaxMemory(long change) {
+ localMaxMemory += change;
+ }
+
+ void changeTotalLoad(float change) {
+ totalLoad += change;
+ }
+
+ void changePrimaryLoad(float change) {
+ totalPrimaryLoad += change;
+ }
+
+ void changeTotalBytes(float change) {
+ totalBytes += change;
+ }
+
+ @Override
+ public int hashCode() {
+ return memberId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Member)) {
+ return false;
+ }
+ Member o = (Member) other;
+ return this.memberId.equals(o.memberId);
+ }
+
+ @Override
+ public int compareTo(Member other) {
+ // memberId is InternalDistributedMember which implements Comparable
+ return this.memberId.compareTo(other.memberId);
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/MemberRollup.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/MemberRollup.java
new file mode 100644
index 0000000..107c4cf
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/MemberRollup.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned.rebalance.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+/**
+ * Represents the sum of all of the colocated regions on a given member. Also,
holds a map of all
+ * of the colocated regions hosted on this member.
+ */
+class MemberRollup extends Member {
+ private final Map<String, Member> colocatedMembers = new HashMap<>();
+
+ MemberRollup(AddressComparor addressComparor, InternalDistributedMember
memberId,
+ boolean isCritical, boolean enforceLocalMaxMemory) {
+ super(addressComparor, memberId, isCritical, enforceLocalMaxMemory);
+ }
+
+ /**
+ * Indicates that this member doesn't have all of the colocated regions
+ */
+ public boolean isInvalid() {
+ return false;
+ }
+
+ public void addColocatedMember(String region, Member member) {
+ if (!getColocatedMembers().containsKey(region)) {
+ this.getColocatedMembers().put(region, member);
+ this.weight += member.weight;
+ changeLocalMaxMemory(member.getConfiguredMaxMemory());
+ }
+ }
+
+
+ public Member getColocatedMember(String region) {
+ return getColocatedMembers().get(region);
+ }
+
+ /**
+ * Update the load on this member rollup with a change in size of one of the
bucket rollups
+ * hosted by this member
+ */
+ public void updateLoad(float load, float primaryLoad, float bytes) {
+ changeTotalLoad(load);
+ changePrimaryLoad(primaryLoad);
+ changeTotalBytes(bytes);
+ }
+
+ @Override
+ public boolean addBucket(Bucket bucket) {
+ if (super.addBucket(bucket)) {
+ BucketRollup bucketRollup = (BucketRollup) bucket;
+ for (Map.Entry<String, Member> entry : getColocatedMembers().entrySet())
{
+ String region = entry.getKey();
+ Member member = entry.getValue();
+ Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
+ if (colocatedBucket != null) {
+ member.addBucket(colocatedBucket);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean removeBucket(Bucket bucket) {
+ if (super.removeBucket(bucket)) {
+ BucketRollup bucketRollup = (BucketRollup) bucket;
+ for (Map.Entry<String, Member> entry : getColocatedMembers().entrySet())
{
+ String region = entry.getKey();
+ Member member = entry.getValue();
+ Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
+ if (colocatedBucket != null) {
+ member.removeBucket(colocatedBucket);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean addPrimary(Bucket bucket) {
+ if (super.addPrimary(bucket)) {
+ BucketRollup bucketRollup = (BucketRollup) bucket;
+ for (Map.Entry<String, Member> entry : getColocatedMembers().entrySet())
{
+ String region = entry.getKey();
+ Member member = entry.getValue();
+ Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
+ if (colocatedBucket != null) {
+ member.addPrimary(colocatedBucket);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean removePrimary(Bucket bucket) {
+ if (super.removePrimary(bucket)) {
+ BucketRollup bucketRollup = (BucketRollup) bucket;
+ for (Map.Entry<String, Member> entry : getColocatedMembers().entrySet())
{
+ String region = entry.getKey();
+ Member member = entry.getValue();
+ Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
+ if (colocatedBucket != null) {
+ member.removePrimary(colocatedBucket);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public RefusalReason willAcceptBucket(Bucket bucket, Member source, boolean
checkIPAddress) {
+ RefusalReason reason = super.willAcceptBucket(bucket, source,
checkIPAddress);
+ if (reason.willAccept()) {
+ BucketRollup bucketRollup = (BucketRollup) bucket;
+ MemberRollup sourceRollup = (MemberRollup) source;
+ for (Map.Entry<String, Member> entry : getColocatedMembers().entrySet())
{
+ String region = entry.getKey();
+ Member member = entry.getValue();
+ Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
+ Member colocatedSource =
+ sourceRollup == null ? null :
sourceRollup.getColocatedMembers().get(region);
+ if (colocatedBucket != null) {
+ reason = member.willAcceptBucket(colocatedBucket, colocatedSource,
checkIPAddress);
+ if (!reason.willAccept()) {
+ return reason;
+ }
+ }
+ }
+ return RefusalReason.NONE;
+ }
+ return reason;
+ }
+
+ Map<String, Member> getColocatedMembers() {
+ return this.colocatedMembers;
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Move.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Move.java
new file mode 100644
index 0000000..b01eb1b
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Move.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned.rebalance.model;
+
+import java.util.Objects;
+
+/**
+ * Represents a move from one node to another. Used to keep track of moves
that we have already
+ * attempted that have failed.
+ */
+public class Move {
+ private final Member source;
+ private final Member target;
+ private final Bucket bucket;
+
+ public Move(Member source, Member target, Bucket bucket) {
+ super();
+ this.source = source;
+ this.target = target;
+ this.bucket = bucket;
+ }
+
+ /**
+ * @return the source
+ */
+ public Member getSource() {
+ return this.source;
+ }
+
+ /**
+ * @return the target
+ */
+ public Member getTarget() {
+ return this.target;
+ }
+
+ /**
+ * @return the bucket
+ */
+ public Bucket getBucket() {
+ return this.bucket;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Move move = (Move) o;
+ return Objects.equals(source, move.source) && Objects.equals(target,
move.target)
+ && Objects.equals(bucket, move.bucket);
+ }
+
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(source, target, bucket);
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
similarity index 55%
rename from
geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java
rename to
geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
index bb561eb..c86200a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.apache.geode.internal.cache.partitioned.rebalance;
+package org.apache.geode.internal.cache.partitioned.rebalance.model;
import java.util.Collection;
import java.util.Comparator;
@@ -37,6 +37,7 @@ import
org.apache.geode.internal.cache.partitioned.InternalPartitionDetails;
import org.apache.geode.internal.cache.partitioned.OfflineMemberDetails;
import org.apache.geode.internal.cache.partitioned.PRLoad;
import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
+import org.apache.geode.internal.cache.partitioned.rebalance.BucketOperator;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -73,23 +74,21 @@ public class PartitionedRegionLoadModel {
* A comparator that is used to sort buckets in the order that we should
satisfy redundancy - most
* needy buckets first.
*/
- private static final Comparator<Bucket> REDUNDANCY_COMPARATOR = new
Comparator<Bucket>() {
- public int compare(Bucket o1, Bucket o2) {
- // put the buckets with the lowest redundancy first
- int result = o1.getRedundancy() - o2.getRedundancy();
- if (result == 0) {
- // put the bucket with the largest load first. This should give us a
- // better chance of finding a place to put it
- result = Float.compare(o2.getLoad(), o1.getLoad());
- }
- if (result == 0) {
- // finally, just use the id so the comparator doesn't swallow buckets
- // with the same load
- result = o1.getId() - o2.getId();
- }
-
- return result;
+ private static final Comparator<Bucket> REDUNDANCY_COMPARATOR = (o1, o2) -> {
+ // put the buckets with the lowest redundancy first
+ int result = o1.getRedundancy() - o2.getRedundancy();
+ if (result == 0) {
+ // put the bucket with the largest load first. This should give us a
+ // better chance of finding a place to put it
+ result = Float.compare(o2.getLoad(), o1.getLoad());
+ }
+ if (result == 0) {
+ // finally, just use the id so the comparator doesn't swallow buckets
+ // with the same load
+ result = o1.getId() - o2.getId();
}
+
+ return result;
};
private static final long MEGABYTES = 1024 * 1024;
@@ -98,30 +97,29 @@ public class PartitionedRegionLoadModel {
* A member to represent inconsistent data. For example, if two members
think they are the primary
* for a bucket, we will set the primary to invalid, so it won't be a
candidate for rebalancing.
*/
- final MemberRollup INVALID_MEMBER = new MemberRollup(null, false, false);
+ public static final MemberRollup INVALID_MEMBER = new MemberRollup(null,
null, false, false);
private final BucketRollup[] buckets;
/**
* A map of all members that host this partitioned region
*/
- private final Map<InternalDistributedMember, MemberRollup> members =
- new HashMap<InternalDistributedMember, MemberRollup>();
+ private final Map<InternalDistributedMember, MemberRollup> members = new
HashMap<>();
/**
* The set of all regions that are colocated in this model.
*/
- private final Set<String> allColocatedRegions = new HashSet<String>();
+ private final Set<String> allColocatedRegions = new HashSet<>();
/**
* The list of buckets that have low redundancy
*/
private SortedSet<BucketRollup> lowRedundancyBuckets = null;
private SortedSet<BucketRollup> overRedundancyBuckets = null;
- private final Collection<Move> attemptedPrimaryMoves = new HashSet<Move>();
- private final Collection<Move> attemptedBucketMoves = new HashSet<Move>();
- private final Collection<Move> attemptedBucketCreations = new
HashSet<Move>();
- private final Collection<Move> attemptedBucketRemoves = new HashSet<Move>();
+ private final Collection<Move> attemptedPrimaryMoves = new HashSet<>();
+ private final Collection<Move> attemptedBucketMoves = new HashSet<>();
+ private final Collection<Move> attemptedBucketCreations = new HashSet<>();
+ private final Collection<Move> attemptedBucketRemoves = new HashSet<>();
private final BucketOperator operator;
private final int requiredRedundancy;
@@ -180,15 +178,14 @@ public class PartitionedRegionLoadModel {
// region. Each bucket has a reference to all of the members
// that host it and each member has a reference to all of the buckets
// it hosts
- Map<InternalDistributedMember, Member> regionMember =
- new HashMap<InternalDistributedMember, Member>();
+ Map<InternalDistributedMember, Member> regionMember = new HashMap<>();
Bucket[] regionBuckets = new Bucket[this.buckets.length];
for (InternalPartitionDetails memberDetails : memberDetailSet) {
InternalDistributedMember memberId =
(InternalDistributedMember) memberDetails.getDistributedMember();
boolean isCritical = criticalMembers.contains(memberId);
- Member member = new Member(memberId,
memberDetails.getPRLoad().getWeight(),
+ Member member = new Member(addressComparor, memberId,
memberDetails.getPRLoad().getWeight(),
memberDetails.getConfiguredMaxMemory(), isCritical,
enforceLocalMaxMemory);
regionMember.put(memberId, member);
@@ -221,7 +218,7 @@ public class PartitionedRegionLoadModel {
MemberRollup memberSum = this.members.get(memberId);
boolean isCritical = criticalMembers.contains(memberId);
if (memberSum == null) {
- memberSum = new MemberRollup(memberId, isCritical,
enforceLocalMaxMemory);
+ memberSum = new MemberRollup(addressComparor, memberId, isCritical,
enforceLocalMaxMemory);
this.members.put(memberId, memberSum);
}
@@ -317,10 +314,6 @@ public class PartitionedRegionLoadModel {
return overRedundancyBuckets;
}
- public void setOverRedundancyBuckets(SortedSet<BucketRollup>
overRedundancyBuckets) {
- this.overRedundancyBuckets = overRedundancyBuckets;
- }
-
public boolean enforceUniqueZones() {
return addressComparor.enforceUniqueZones();
}
@@ -355,10 +348,10 @@ public class PartitionedRegionLoadModel {
}
private Map<String, Long> getColocatedRegionSizes(BucketRollup bucket) {
- Map<String, Long> colocatedRegionSizes = new HashMap<String, Long>();
+ Map<String, Long> colocatedRegionSizes = new HashMap<>();
for (Map.Entry<String, Bucket> entry :
bucket.getColocatedBuckets().entrySet()) {
- colocatedRegionSizes.put(entry.getKey(),
Long.valueOf(entry.getValue().getBytes()));
+ colocatedRegionSizes.put(entry.getKey(), entry.getValue().getBytes());
}
return colocatedRegionSizes;
}
@@ -405,7 +398,7 @@ public class PartitionedRegionLoadModel {
});
}
- protected void remoteOverRedundancyBucket(BucketRollup bucket, Member
targetMember) {
+ public void remoteOverRedundancyBucket(BucketRollup bucket, Member
targetMember) {
Move bestMove = new Move(null, targetMember, bucket);
Map<String, Long> colocatedRegionSizes = getColocatedRegionSizes(bucket);
@@ -425,7 +418,7 @@ public class PartitionedRegionLoadModel {
}
private void initLowRedundancyBuckets() {
- this.lowRedundancyBuckets = new
TreeSet<BucketRollup>(REDUNDANCY_COMPARATOR);
+ this.lowRedundancyBuckets = new TreeSet<>(REDUNDANCY_COMPARATOR);
for (BucketRollup b : this.buckets) {
if (b != null && b.getRedundancy() >= 0 && b.getRedundancy() <
this.requiredRedundancy) {
this.lowRedundancyBuckets.add(b);
@@ -434,7 +427,7 @@ public class PartitionedRegionLoadModel {
}
private void initOverRedundancyBuckets() {
- this.overRedundancyBuckets = new
TreeSet<BucketRollup>(REDUNDANCY_COMPARATOR);
+ this.overRedundancyBuckets = new TreeSet<>(REDUNDANCY_COMPARATOR);
for (BucketRollup b : this.buckets) {
if (b != null && b.getOnlineRedundancy() > this.requiredRedundancy) {
this.overRedundancyBuckets.add(b);
@@ -491,7 +484,6 @@ public class PartitionedRegionLoadModel {
}
public Move findBestTargetForFPR(Bucket bucket, boolean checkIPAddress) {
- Move noMove = null;
InternalDistributedMember targetMemberID = null;
Member targetMember = null;
List<FixedPartitionAttributesImpl> fpas =
@@ -514,10 +506,10 @@ public class PartitionedRegionLoadModel {
}
}
- return noMove;
+ return null;
}
- protected boolean movePrimary(Move bestMove) {
+ public boolean movePrimary(Move bestMove) {
Member bestSource = bestMove.getSource();
Member bestTarget = bestMove.getTarget();
Bucket bestBucket = bestMove.getBucket();
@@ -561,54 +553,6 @@ public class PartitionedRegionLoadModel {
}
/**
- * Move all primary from other to this
- */
- private void makeFPRPrimaryForThisNode() {
- List<FixedPartitionAttributesImpl> FPAs =
- this.partitionedRegion.getFixedPartitionAttributesImpl();
- InternalDistributedMember targetId =
this.partitionedRegion.getDistributionManager().getId();
- Member target = this.members.get(targetId);
- Set<Bucket> unsuccessfulAttempts = new HashSet<Bucket>();
- for (Bucket bucket : this.buckets) {
- if (bucket != null) {
- for (FixedPartitionAttributesImpl fpa : FPAs) {
- if (fpa.hasBucket(bucket.id) && fpa.isPrimary()) {
- Member source = bucket.primary;
- bucket.getPrimary();
- if (source != target) {
- // HACK: In case we don't know who is Primary at this time
- // we just set source as target too for stat purposes
-
- InternalDistributedMember srcDM = (source == null || source ==
INVALID_MEMBER)
- ? target.getDistributedMember() :
source.getDistributedMember();
- if (logger.isDebugEnabled()) {
- logger.debug(
- "PRLM#movePrimariesForFPR: For Bucket#{}, moving primary
from source {} to target {}",
- bucket.getId(), bucket.primary, target);
- }
- boolean successfulMove =
- this.operator.movePrimary(srcDM,
target.getDistributedMember(), bucket.getId());
- unsuccessfulAttempts.add(bucket);
- // We have to move the primary otherwise there is some problem!
- Assert.assertTrue(successfulMove,
- " Fixed partitioned region not able to move the primary!");
- if (successfulMove) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "PRLM#movePrimariesForFPR: For Bucket#{}, moving primary
source {} to target {}",
- bucket.getId(), source, target);
- }
-
- bucket.setPrimary(target, bucket.getPrimaryLoad());
- }
- }
- }
- }
- }
- }
- }
-
- /**
* Calculate the target weighted number of primaries on each node.
*/
private float getPrimaryAverage() {
@@ -764,7 +708,7 @@ public class PartitionedRegionLoadModel {
return bestMove;
}
- protected boolean moveBucket(Move bestMove) {
+ public boolean moveBucket(Move bestMove) {
Member bestSource = bestMove.getSource();
Member bestTarget = bestMove.getTarget();
BucketRollup bestBucket = (BucketRollup) bestMove.getBucket();
@@ -795,7 +739,7 @@ public class PartitionedRegionLoadModel {
* @return a set of partitioned member details.
*/
public Set<PartitionMemberInfo> getPartitionedMemberDetails(String region) {
- TreeSet<PartitionMemberInfo> result = new TreeSet<PartitionMemberInfo>();
+ TreeSet<PartitionMemberInfo> result = new TreeSet<>();
for (MemberRollup member : this.members.values()) {
Member colocatedMember = member.getColocatedMember(region);
if (colocatedMember != null) {
@@ -843,11 +787,7 @@ public class PartitionedRegionLoadModel {
@Override
public String toString() {
StringBuilder result = new StringBuilder();
- TreeSet<Bucket> allBucketIds = new TreeSet<Bucket>(new
Comparator<Bucket>() {
- public int compare(Bucket o1, Bucket o2) {
- return o1.getId() - o2.getId();
- }
- });
+ TreeSet<Bucket> allBucketIds = new
TreeSet<>(Comparator.comparingInt(Bucket::getId));
if (this.members.isEmpty()) {
return "";
}
@@ -891,684 +831,4 @@ public class PartitionedRegionLoadModel {
return result.toString();
}
- /**
- * Represents the sum of all of the colocated regions on a given member.
Also, holds a map of all
- * of the colocated regions hosted on this member.
- */
- private class MemberRollup extends Member {
- private final Map<String, Member> colocatedMembers = new HashMap<String,
Member>();
- private final boolean invalid = false;
-
- public MemberRollup(InternalDistributedMember memberId, boolean isCritical,
- boolean enforceLocalMaxMemory) {
- super(memberId, isCritical, enforceLocalMaxMemory);
- }
-
- /**
- * Indicates that this member doesn't have all of the colocated regions
- */
- public boolean isInvalid() {
- return invalid;
- }
-
- public boolean addColocatedMember(String region, Member member) {
- if (!getColocatedMembers().containsKey(region)) {
- this.getColocatedMembers().put(region, member);
- this.weight += member.weight;
- this.localMaxMemory += member.localMaxMemory;
- return true;
- }
- return false;
- }
-
-
- public Member getColocatedMember(String region) {
- return getColocatedMembers().get(region);
- }
-
- /**
- * Update the load on this member rollup with a change in size of one of
the bucket rollups
- * hosted by this member
- */
- public void updateLoad(float load, float primaryLoad, float bytes) {
- this.totalLoad += load;
- this.totalPrimaryLoad += primaryLoad;
- this.totalBytes += bytes;
- }
-
- @Override
- public boolean addBucket(Bucket bucket) {
- if (super.addBucket(bucket)) {
- BucketRollup bucketRollup = (BucketRollup) bucket;
- for (Map.Entry<String, Member> entry :
getColocatedMembers().entrySet()) {
- String region = entry.getKey();
- Member member = entry.getValue();
- Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
- if (colocatedBucket != null) {
- member.addBucket(colocatedBucket);
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public boolean removeBucket(Bucket bucket) {
- if (super.removeBucket(bucket)) {
- BucketRollup bucketRollup = (BucketRollup) bucket;
- for (Map.Entry<String, Member> entry :
getColocatedMembers().entrySet()) {
- String region = entry.getKey();
- Member member = entry.getValue();
- Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
- if (colocatedBucket != null) {
- member.removeBucket(colocatedBucket);
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public boolean addPrimary(Bucket bucket) {
- if (super.addPrimary(bucket)) {
- BucketRollup bucketRollup = (BucketRollup) bucket;
- for (Map.Entry<String, Member> entry :
getColocatedMembers().entrySet()) {
- String region = entry.getKey();
- Member member = entry.getValue();
- Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
- if (colocatedBucket != null) {
- member.addPrimary(colocatedBucket);
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public boolean removePrimary(Bucket bucket) {
- if (super.removePrimary(bucket)) {
- BucketRollup bucketRollup = (BucketRollup) bucket;
- for (Map.Entry<String, Member> entry :
getColocatedMembers().entrySet()) {
- String region = entry.getKey();
- Member member = entry.getValue();
- Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
- if (colocatedBucket != null) {
- member.removePrimary(colocatedBucket);
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public RefusalReason willAcceptBucket(Bucket bucket, Member source,
boolean checkIPAddress) {
- RefusalReason reason = super.willAcceptBucket(bucket, source,
checkIPAddress);
- if (reason.willAccept()) {
- BucketRollup bucketRollup = (BucketRollup) bucket;
- MemberRollup sourceRollup = (MemberRollup) source;
- for (Map.Entry<String, Member> entry :
getColocatedMembers().entrySet()) {
- String region = entry.getKey();
- Member member = entry.getValue();
- Bucket colocatedBucket =
bucketRollup.getColocatedBuckets().get(region);
- Member colocatedSource =
- sourceRollup == null ? null :
sourceRollup.getColocatedMembers().get(region);
- if (colocatedBucket != null) {
- reason = member.willAcceptBucket(colocatedBucket, colocatedSource,
checkIPAddress);
- if (!reason.willAccept()) {
- return reason;
- }
- }
- }
- return RefusalReason.NONE;
- }
- return reason;
- }
-
- Map<String, Member> getColocatedMembers() {
- return this.colocatedMembers;
- }
- }
-
- /**
- * Represents the sum of all of colocated buckets with a given bucket id.
- */
- protected class BucketRollup extends Bucket {
- private final Map<String, Bucket> colocatedBuckets = new HashMap<String,
Bucket>();
-
- public BucketRollup(int id) {
- super(id);
- }
-
- /**
- * @param region
- * @param b
- */
- public boolean addColocatedBucket(String region, Bucket b) {
- if (!this.getColocatedBuckets().containsKey(region)) {
- this.getColocatedBuckets().put(region, b);
- this.load += b.getLoad();
- this.primaryLoad += b.getPrimaryLoad();
- this.bytes += b.getBytes();
- this.offlineMembers.addAll(b.getOfflineMembers());
-
- // Update the load on the members hosting this bucket
- // to reflect the fact that the bucket is larger now.
- for (Member member : getMembersHosting()) {
- MemberRollup rollup = (MemberRollup) member;
- float primaryLoad = 0;
- if (this.getPrimary() == member) {
- primaryLoad = b.getPrimaryLoad();
- }
- rollup.updateLoad(b.getLoad(), primaryLoad, b.getBytes());
- }
- return true;
- }
-
- return false;
- }
-
- @Override
- public boolean addMember(Member targetMember) {
- if (super.addMember(targetMember)) {
- MemberRollup memberRollup = (MemberRollup) targetMember;
- for (Map.Entry<String, Bucket> entry :
getColocatedBuckets().entrySet()) {
- String region = entry.getKey();
- Bucket bucket = entry.getValue();
- Member member = memberRollup.getColocatedMembers().get(region);
- if (member != null) {
- bucket.addMember(member);
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public boolean removeMember(Member targetMember) {
- if (super.removeMember(targetMember)) {
- MemberRollup memberRollup = (MemberRollup) targetMember;
- for (Map.Entry<String, Bucket> entry :
getColocatedBuckets().entrySet()) {
- String region = entry.getKey();
- Bucket bucket = entry.getValue();
- Member member = memberRollup.getColocatedMembers().get(region);
- if (member != null) {
- bucket.removeMember(member);
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public void setPrimary(Member targetMember, float primaryLoad) {
- super.setPrimary(targetMember, primaryLoad);
- if (targetMember != null) {
- MemberRollup memberRollup = (MemberRollup) targetMember;
- for (Map.Entry<String, Bucket> entry :
getColocatedBuckets().entrySet()) {
- String region = entry.getKey();
- Bucket bucket = entry.getValue();
- Member member = memberRollup.getColocatedMembers().get(region);
- if (member != null) {
- bucket.setPrimary(member, primaryLoad);
- }
- }
- }
- }
-
- Map<String, Bucket> getColocatedBuckets() {
- return this.colocatedBuckets;
- }
- }
-
- /**
- * Represents a single member of the distributed system.
- */
- protected class Member implements Comparable<Member> {
- private final InternalDistributedMember memberId;
- protected float weight;
- protected float totalLoad;
- protected float totalPrimaryLoad;
- protected long totalBytes;
- protected long localMaxMemory;
- private final Set<Bucket> buckets = new TreeSet<Bucket>();
- private final Set<Bucket> primaryBuckets = new TreeSet<Bucket>();
- private final boolean isCritical;
- private final boolean enforceLocalMaxMemory;
-
- public Member(InternalDistributedMember memberId, boolean isCritical,
- boolean enforceLocalMaxMemory) {
- this.memberId = memberId;
- this.isCritical = isCritical;
- this.enforceLocalMaxMemory = enforceLocalMaxMemory;
- }
-
- public Member(InternalDistributedMember memberId, float weight, long
localMaxMemory,
- boolean isCritical, boolean enforceLocalMaxMemory) {
- this(memberId, isCritical, enforceLocalMaxMemory);
- this.weight = weight;
- this.localMaxMemory = localMaxMemory;
- }
-
- /**
- * @param bucket
- * @param sourceMember the member we will be moving this bucket off of
- * @param checkZone true if we should not put two copies of a bucket on
two nodes with the same
- * IP address.
- */
- public RefusalReason willAcceptBucket(Bucket bucket, Member sourceMember,
boolean checkZone) {
- // make sure this member is not already hosting this bucket
- if (getBuckets().contains(bucket)) {
- return RefusalReason.ALREADY_HOSTING;
- }
- // Check the ip address
- if (checkZone) {
- // If the source member is equivalent to the target member, go
- // ahead and allow the bucket move (it's not making our redundancy
worse).
- // TODO we could have some logic to prefer moving to different ip
addresses
- // Probably that logic should be another stage after redundancy
recovery, like
- // improveRedundancy.
- boolean sourceIsEquivalent = sourceMember != null
- && addressComparor.areSameZone(getMemberId(),
sourceMember.getDistributedMember());
- if (sourceMember == null || !sourceIsEquivalent) {
- for (Member hostingMember : bucket.getMembersHosting()) {
- if ((!hostingMember.equals(sourceMember) ||
addressComparor.enforceUniqueZones())
- && addressComparor.areSameZone(getMemberId(),
- hostingMember.getDistributedMember())) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Member {} would prefer not to host {} because it is
already on another member with the same redundancy zone",
- this, bucket);
- }
- return RefusalReason.SAME_ZONE;
- }
- }
- }
- }
-
- // check the localMaxMemory
- if (this.enforceLocalMaxMemory && this.totalBytes + bucket.getBytes() >
this.localMaxMemory) {
- if (logger.isDebugEnabled()) {
- logger.debug("Member {} won't host bucket {} because it doesn't have
enough space", this,
- bucket);
- }
- return RefusalReason.LOCAL_MAX_MEMORY_FULL;
- }
-
- // check to see if the heap is critical
- if (isCritical) {
- if (logger.isDebugEnabled()) {
- logger.debug("Member {} won't host bucket {} because it's heap is
critical", this,
- bucket);
- }
- return RefusalReason.CRITICAL_HEAP;
- }
-
- return RefusalReason.NONE;
- }
-
- public boolean addBucket(Bucket bucket) {
- if (getBuckets().add(bucket)) {
- bucket.addMember(this);
- this.totalBytes += bucket.getBytes();
- this.totalLoad += bucket.getLoad();
- return true;
- }
- return false;
- }
-
- public boolean removeBucket(Bucket bucket) {
- if (getBuckets().remove(bucket)) {
- bucket.removeMember(this);
- this.totalBytes -= bucket.getBytes();
- this.totalLoad -= bucket.getLoad();
- return true;
- }
- return false;
- }
-
- public boolean removePrimary(Bucket bucket) {
- if (getPrimaryBuckets().remove(bucket)) {
- this.totalPrimaryLoad -= bucket.getPrimaryLoad();
- return true;
- }
- return false;
- }
-
- public boolean addPrimary(Bucket bucket) {
- if (getPrimaryBuckets().add(bucket)) {
- this.totalPrimaryLoad += bucket.getPrimaryLoad();
- return true;
- }
- return false;
- }
-
- public int getBucketCount() {
- return getBuckets().size();
- }
-
- public long getConfiguredMaxMemory() {
- return this.localMaxMemory;
- }
-
- public InternalDistributedMember getDistributedMember() {
- return getMemberId();
- }
-
- public int getPrimaryCount() {
- int primaryCount = 0;
- for (Bucket bucket : getBuckets()) {
- if (this.equals(bucket.primary)) {
- primaryCount++;
- }
- }
- return primaryCount;
- }
-
- public long getSize() {
- return this.totalBytes;
- }
-
- public float getTotalLoad() {
- return this.totalLoad;
- }
-
- public float getWeight() {
- return this.weight;
- }
-
- @Override
- public String toString() {
- return "Member(id=" + getMemberId() + ")";
- }
-
- public float getPrimaryLoad() {
- return this.totalPrimaryLoad;
- }
-
- protected Set<Bucket> getBuckets() {
- return this.buckets;
- }
-
- private InternalDistributedMember getMemberId() {
- return this.memberId;
- }
-
- private Set<Bucket> getPrimaryBuckets() {
- return this.primaryBuckets;
- }
-
- @Override
- public int hashCode() {
- return memberId.hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof Member)) {
- return false;
- }
- Member o = (Member) other;
- return this.memberId.equals(o.memberId);
- }
-
- public int compareTo(Member other) {
- // memberId is InternalDistributedMember which implements Comparable
- return this.memberId.compareTo(other.memberId);
- }
- }
-
- /**
- * Represents a single bucket.
- */
- protected class Bucket implements Comparable<Bucket> {
- protected long bytes;
- private final int id;
- protected float load;
- protected float primaryLoad;
- private int redundancy = -1;
- private final Set<Member> membersHosting = new TreeSet<Member>();
- private Member primary;
- protected Set<PersistentMemberID> offlineMembers = new
HashSet<PersistentMemberID>();
-
- public Bucket(int id) {
- this.id = id;
- }
-
- public Bucket(int id, float load, long bytes, Set<PersistentMemberID>
offlineMembers) {
- this(id);
- this.load = load;
- this.bytes = bytes;
- this.offlineMembers = offlineMembers;
- }
-
- public void setPrimary(Member member, float primaryLoad) {
- if (this.primary == INVALID_MEMBER) {
- return;
- }
- if (this.primary != null) {
- this.primary.removePrimary(this);
- }
- this.primary = member;
- this.primaryLoad = primaryLoad;
- if (primary != INVALID_MEMBER && primary != null) {
- addMember(primary);
- member.addPrimary(this);
- }
- }
-
- /**
- * @param targetMember
- */
- public boolean addMember(Member targetMember) {
- if (this.getMembersHosting().add(targetMember)) {
- this.redundancy++;
- targetMember.addBucket(this);
- return true;
- }
-
- return false;
- }
-
- public boolean removeMember(Member targetMember) {
- if (this.getMembersHosting().remove(targetMember)) {
- if (targetMember == this.primary) {
- setPrimary(null, 0);
- }
- this.redundancy--;
- targetMember.removeBucket(this);
- return true;
- }
- return false;
- }
-
- public int getRedundancy() {
- return this.redundancy + offlineMembers.size();
- }
-
- public int getOnlineRedundancy() {
- return this.redundancy;
- }
-
- public float getLoad() {
- return this.load;
- }
-
- public int getId() {
- return this.id;
- }
-
- public long getBytes() {
- return this.bytes;
- }
-
- @Override
- public String toString() {
- return "Bucket(id=" + getId() + ",load=" + load + ")";
- }
-
- public float getPrimaryLoad() {
- return this.primaryLoad;
- }
-
- public Set<Member> getMembersHosting() {
- return this.membersHosting;
- }
-
- public Member getPrimary() {
- return this.primary;
- }
-
- public Collection<? extends PersistentMemberID> getOfflineMembers() {
- return offlineMembers;
- }
-
- @Override
- public int hashCode() {
- return this.id;
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof Bucket)) {
- return false;
- }
- Bucket o = (Bucket) other;
- return this.id == o.id;
- }
-
- public int compareTo(Bucket other) {
- if (this.id < other.id) {
- return -1;
- } else if (this.id > other.id) {
- return 1;
- } else {
- return 0;
- }
- }
- }
-
- /**
- * Represents a move from one node to another. Used to keep track of moves
that we have already
- * attempted that have failed.
- */
- protected static class Move {
- private final Member source;
- private final Member target;
- private final Bucket bucket;
-
- public Move(Member source, Member target, Bucket bucket) {
- super();
- this.source = source;
- this.target = target;
- this.bucket = bucket;
- }
-
- /**
- * @return the source
- */
- public Member getSource() {
- return this.source;
- }
-
- /**
- * @return the target
- */
- public Member getTarget() {
- return this.target;
- }
-
- /**
- * @return the bucket
- */
- public Bucket getBucket() {
- return this.bucket;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((this.bucket == null) ? 0 :
this.bucket.hashCode());
- result = prime * result + ((this.source == null) ? 0 :
this.source.hashCode());
- result = prime * result + ((this.target == null) ? 0 :
this.target.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Move other = (Move) obj;
- if (this.bucket == null) {
- if (other.bucket != null)
- return false;
- } else if (!this.bucket.equals(other.bucket))
- return false;
- if (this.source == null) {
- if (other.source != null)
- return false;
- } else if (!this.source.equals(other.source))
- return false;
- if (this.target == null) {
- if (other.target != null)
- return false;
- } else if (!this.target.equals(other.target))
- return false;
- return true;
- }
- }
-
- public interface AddressComparor {
-
- boolean enforceUniqueZones();
-
- /**
- * Return true if the two addresses are equivalent
- */
- boolean areSameZone(InternalDistributedMember member1,
InternalDistributedMember member2);
- }
-
- public static enum RefusalReason {
- NONE, ALREADY_HOSTING, UNITIALIZED_MEMBER, SAME_ZONE,
LOCAL_MAX_MEMORY_FULL, CRITICAL_HEAP;
-
- public boolean willAccept() {
- return this == NONE;
- }
-
- public String formatMessage(Member source, Member target, Bucket bucket) {
- switch (this) {
- case NONE:
- return "No reason, the move should be allowed.";
- case ALREADY_HOSTING:
- return "Target member " + target.getMemberId() + " is already
hosting bucket "
- + bucket.getId();
- case UNITIALIZED_MEMBER:
- return "Target member " + target.getMemberId() + " is not fully
initialized";
- case SAME_ZONE:
- return "Target member " + target.getMemberId()
- + " is in the same redundancy zone as other members hosting
bucket " + bucket.getId()
- + ": " + bucket.getMembersHosting();
- case LOCAL_MAX_MEMORY_FULL:
- return "Target member " + target.getMemberId()
- + " does not have space within it's local max memory for bucket
" + bucket.getId()
- + ". Bucket Size " + bucket.getBytes() + " local max memory: " +
target.localMaxMemory
- + " remaining: " + target.totalBytes;
- case CRITICAL_HEAP:
- return "Target member " + target.getMemberId()
- + " has reached its critical heap percentage, and cannot accept
more data";
- default:
- return this.toString();
- }
- }
- }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java
new file mode 100644
index 0000000..bd71b40
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned.rebalance.model;
+
+public enum RefusalReason {
+ NONE, ALREADY_HOSTING, UNITIALIZED_MEMBER, SAME_ZONE, LOCAL_MAX_MEMORY_FULL,
CRITICAL_HEAP;
+
+ public boolean willAccept() {
+ return this == NONE;
+ }
+
+ public String formatMessage(Member target, Bucket bucket) {
+ switch (this) {
+ case NONE:
+ return "No reason, the move should be allowed.";
+ case ALREADY_HOSTING:
+ return "Target member " + target.getMemberId() + " is already hosting
bucket "
+ + bucket.getId();
+ case UNITIALIZED_MEMBER:
+ return "Target member " + target.getMemberId() + " is not fully
initialized";
+ case SAME_ZONE:
+ return "Target member " + target.getMemberId()
+ + " is in the same redundancy zone as other members hosting bucket
" + bucket.getId()
+ + ": " + bucket.getMembersHosting();
+ case LOCAL_MAX_MEMORY_FULL:
+ return "Target member " + target.getMemberId()
+ + " does not have space within it's local max memory for bucket "
+ bucket.getId()
+ + ". Bucket Size " + bucket.getBytes() + " local max memory: "
+ + target.getConfiguredMaxMemory() + " remaining: " +
target.getSize();
+ case CRITICAL_HEAP:
+ return "Target member " + target.getMemberId()
+ + " has reached its critical heap percentage, and cannot accept
more data";
+ default:
+ return this.toString();
+ }
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionLoadModelJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
similarity index 99%
rename from
geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionLoadModelJUnitTest.java
rename to
geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
index a372a5c..21c7b53 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionLoadModelJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.apache.geode.internal.cache.partitioned;
+package org.apache.geode.internal.cache.partitioned.rebalance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -32,7 +32,6 @@ import java.util.Random;
import java.util.Set;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -40,12 +39,16 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.partition.PartitionMemberInfo;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.partitioned.OfflineMemberDetails;
+import org.apache.geode.internal.cache.partitioned.OfflineMemberDetailsImpl;
+import org.apache.geode.internal.cache.partitioned.PRLoad;
+import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
import
org.apache.geode.internal.cache.partitioned.rebalance.BucketOperator.Completion;
import org.apache.geode.internal.cache.partitioned.rebalance.CompositeDirector;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel;
-import
org.apache.geode.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor;
import org.apache.geode.internal.cache.partitioned.rebalance.RebalanceDirector;
import
org.apache.geode.internal.cache.partitioned.rebalance.SimulatedBucketOperator;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.AddressComparor;
+import
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.test.junit.categories.UnitTest;
--
To stop receiving notification emails like this one, please contact
[email protected].