This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 7c4301b11 Fix BestPossibleExternalViewVerifier to use a ZkClient that
has the serializer set to ByteArraySerializer (#2776)
7c4301b11 is described below
commit 7c4301b115bbad44a09b199a84abcc2c238858a0
Author: Zachary Pinto <[email protected]>
AuthorDate: Wed Mar 13 20:00:32 2024 -0700
Fix BestPossibleExternalViewVerifier to use a ZkClient that has the
serializer set to ByteArraySerializer (#2776)
* Fix BestPossibleExternalViewVerifier to use a ZkClient that has the
serializer set to ByteArraySerializer so it can read the assignment meta store
best possible state. Fix BestPossibleExternalViewVerifier to actually calculate
BEST_POSSIBLE instead of returning last persisted to ZK because we now need to
consider handleDelayedRebalanceMinActiveReplica not being persisted to
ZK(#2447). Fix handleDelayedRebalanceMinActiveReplica modifying in-memory
_bestPossibleState in the _assignmen [...]
---
.../rebalancer/waged/AssignmentManager.java | 8 ++++++-
.../BestPossibleExternalViewVerifier.java | 27 +++++++++-------------
.../rebalancer/TestInstanceOperation.java | 5 +---
3 files changed, 19 insertions(+), 21 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
index 475e8aad1..8cb089cb9 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ResourceAssignment;
@@ -86,7 +88,11 @@ class AssignmentManager {
if (assignmentMetadataStore != null) {
try {
_stateReadLatency.startMeasuringLatency();
- currentBestAssignment = new
HashMap<>(assignmentMetadataStore.getBestPossibleAssignment());
+ currentBestAssignment =
+
assignmentMetadataStore.getBestPossibleAssignment().entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey,
+ entry -> new
ResourceAssignment(entry.getValue().getRecord())));
+ ;
_stateReadLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException(
diff --git
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 1997bea06..0b0926dda 100644
---
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -30,19 +30,16 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixRebalanceException;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.common.PartitionStateMap;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
-import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ClusterConfig;
@@ -50,7 +47,6 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.RebalanceUtil;
@@ -59,8 +55,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * verifier that the ExternalViews of given resources (or all resources in the
cluster)
- * match its best possible mapping states.
+ * Verify that the ExternalViews of given resources (or all resources in the
cluster)
+ * match its best possible mapping states. The best possible mapping states
are computed
+ * by running the BestPossibleStateCalc stage with the same inputs that the
controller would
+ * use to calculate the best possible state. The mappings produced by this
stage are compared
+ * to the external view to ensure that they match. When they match, the
cluster has converged.
+ * Note: The best possible state compared to the external view includes the
non-persisted state
+ * mappings generated when handling MIN_ACTIVE replicas.
*/
public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
private static Logger LOG =
LoggerFactory.getLogger(BestPossibleExternalViewVerifier.class);
@@ -433,7 +434,10 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
RebalanceUtil.runStage(event, new CurrentStateComputationStage());
// Note the readOnlyWagedRebalancer is just for one time usage
- try (ZkBucketDataAccessor zkBucketDataAccessor = new
ZkBucketDataAccessor(_zkClient);
+ try (
+ // Pass the zkAddress to constructor to ensure the correct ZkClient is
created with ByteArraySerializer
+ ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(
+ _zkClient.getServers());
DryrunWagedRebalancer dryrunWagedRebalancer = new
DryrunWagedRebalancer(zkBucketDataAccessor,
cache.getClusterName(),
cache.getClusterConfig().getGlobalRebalancePreference())) {
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
dryrunWagedRebalancer);
@@ -462,14 +466,5 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
super(zkBucketDataAccessor, clusterName, preferences);
}
-
- @Override
- protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
- ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
- Set<String> activeNodes, CurrentStateOutput currentStateOutput,
- RebalanceAlgorithm algorithm) throws HelixRebalanceException {
- return getBestPossibleAssignment(getAssignmentMetadataStore(),
currentStateOutput,
- resourceMap.keySet());
- }
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 010153e64..1fc3a3e20 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -1070,7 +1070,6 @@ public class TestInstanceOperation extends ZkTestBase {
InstanceConstants.InstanceOperation.EVACUATE);
// Validate that the assignment has not changed since setting the
InstanceOperation to EVACUATE
- Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
@@ -1105,7 +1104,7 @@ public class TestInstanceOperation extends ZkTestBase {
Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))),
TIMEOUT);
}
- @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapWithSwapOutInstanceOffline")
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testSwapEvacuateAdd")
public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws
Exception {
System.out.println(
"START
TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
@@ -1326,8 +1325,6 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate));
}
- Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
-
// exit MM
_gSetupTool.getClusterManagementTool()
.manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);