This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 2deef9d92 Change partitionAssignment API to handle ANY_LIVEINSTANCE
(#2817)
2deef9d92 is described below
commit 2deef9d921dd12cde26c30fa2c7297c41c168ad2
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Wed Jul 17 16:31:50 2024 -0700
Change partitionAssignment API to handle ANY_LIVEINSTANCE (#2817)
Handle ANY_LIVEINSTANCE by calling getReplicaCount
---
.../helix/controller/stages/ClusterDataCache.java | 21 +++++---------------
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 6 +++++-
.../java/org/apache/helix/model/IdealState.java | 5 +++--
.../main/java/org/apache/helix/util/HelixUtil.java | 2 +-
.../rest/server/TestPartitionAssignmentAPI.java | 23 ++++++++++++++++++++++
5 files changed, 37 insertions(+), 20 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index cc0dd6629..a11cf2b07 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -159,22 +159,11 @@ public class ClusterDataCache extends
ResourceControllerDataProvider {
Map<String, IdealState> idealStateMap =
_idealStateCache.getIdealStateMap();
if (idealStateMap.containsKey(resourceName)) {
- String replicasStr = idealStateMap.get(resourceName).getReplicas();
-
- if (replicasStr != null) {
- if
(replicasStr.equals(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString()))
{
- replicas = _liveInstanceMap.size();
- } else {
- try {
- replicas = Integer.parseInt(replicasStr);
- } catch (Exception e) {
- LogUtil.logError(LOG, _eventId, "invalid replicas string: " +
replicasStr + " for "
- + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
- }
- }
- } else {
- LogUtil.logError(LOG, _eventId, "idealState for resource: " +
resourceName
- + " does NOT have replicas for " + (_isTaskCache ? "TASK" :
"DEFAULT") + "pipeline");
+ int replicasStr =
idealStateMap.get(resourceName).getReplicaCount(_liveInstanceMap.size());
+
+ if (replicasStr == 0) {
+ LogUtil.logError(LOG, _eventId,
+ "idealState for resource: " + resourceName + " does NOT have
replicas for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
}
}
return replicas;
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 39ae9ae67..36c9a6a5e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -2484,7 +2484,11 @@ public class ZKHelixAdmin implements HelixAdmin {
setResourceIdealState(clusterName, idealState.getResourceName(),
idealState);
// 4. rebalance the resource
- rebalance(clusterName, idealState.getResourceName(),
Integer.parseInt(idealState.getReplicas()),
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ List<String> liveNodes =
accessor.getChildNames(keyBuilder.liveInstances());
+
+ rebalance(clusterName, idealState.getResourceName(),
idealState.getReplicaCount(liveNodes.size()),
idealState.getResourceName(), idealState.getInstanceGroupTag());
return true;
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index aafcca89b..44a52d09e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -554,8 +554,9 @@ public class IdealState extends HelixProperty {
}
/**
- * Get the number of replicas for each partition of this resource
- * @return number of replicas (as a string)
+ * Get the number of replicas for each partition of this resource. Return
value can be "ANY_LIVEINSTANCE", use
+ * {@link #getReplicaCount(int)} to prevent NumberFormatException when
parsing string for int.
+ * @return String value of the replica count,
*/
public String getReplicas() {
// HACK: if replica doesn't exists, use the length of the first list field
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 834b84678..70269a746 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -394,7 +394,7 @@ public final class HelixUtil {
RebalanceStrategy.class.cast(loadClass(HelixUtil.class,
strategyClassName).newInstance());
strategy.init(idealState.getResourceName(), partitions,
stateModelDefinition
- .getStateCountMap(liveInstances.size(),
Integer.parseInt(idealState.getReplicas())),
+ .getStateCountMap(liveInstances.size(),
idealState.getReplicaCount(liveInstances.size())),
idealState.getMaxPartitionsPerInstance());
// Remove all disabled instances so that Helix will not consider them live.
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
index e00c392b0..f3a4bbdd9 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
@@ -38,6 +38,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -379,6 +380,26 @@ public class TestPartitionAssignmentAPI extends
AbstractTestClass {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
+ private void createAutoRebalanceResource(String db) {
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, 1, "LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO + "", null);
+ _resources.add(db);
+
+ IdealState idealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+
+ idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+ idealState.setRebalanceStrategy(AutoRebalanceStrategy.class.getName());
+ idealState.setReplicas("ANY_LIVEINSTANCE");
+ idealState.enable(true);
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
db, idealState);
+
+ ResourceConfig resourceConfig = new ResourceConfig(db);
+ _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+
@Test
private void testComputePartitionAssignmentMaintenanceMode() throws
Exception {
@@ -399,6 +420,8 @@ public class TestPartitionAssignmentAPI extends
AbstractTestClass {
MIN_ACTIVE_REPLICA, 100000L);
}
+ createAutoRebalanceResource("TEST_AUTOREBALANCE_DB_0");
+
// Wait for cluster to converge after adding resources
Assert.assertTrue(_clusterVerifier.verifyByPolling());