This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 3cd22a333 Fix /partitionAssignmentAPI and WAGED rebalancer
finalMapping results matching (#2739)
3cd22a333 is described below
commit 3cd22a33332f277e9e68c7341a0a89f54ff46aee
Author: Zachary Pinto <[email protected]>
AuthorDate: Tue Jan 23 10:15:23 2024 -0800
Fix /partitionAssignmentAPI and WAGED rebalancer finalMapping results
matching (#2739)
OfflineTimeMap was not being cleared by the cache and the
BEST_POSSIBLE_STATE was not being persisted to ZK each time a new version was
being computed by partial rebalance. Fixing these two issues ensures that the
pipeline produces the correct finalMapping and the partitionAssignmentAPI is
not using a stale BEST_POSSIBLE_STATE to compute the simulated finalMapping.
---
.../dataproviders/BaseControllerDataProvider.java | 22 +-
.../rebalancer/waged/AssignmentMetadataStore.java | 12 +
.../rebalancer/waged/PartialRebalanceRunner.java | 3 +
.../rebalancer/waged/WagedRebalancer.java | 8 +-
.../rest/server/TestPartitionAssignmentAPI.java | 356 +++++++++++++++++++++
5 files changed, 381 insertions(+), 20 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 9120bd962..6fe1c7fc3 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -123,7 +123,6 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
// This is used for SWAP related operations where there can be two instances
with the same logicalId.
private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new
HashMap<>();
private final Map<String, LiveInstance> _assignableLiveInstancesMap = new
HashMap<>();
- private final Set<String> _assignableDisabledInstanceSet = new HashSet<>();
private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName =
new HashMap<>();
private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver>
_abnormalStateResolverMap = new HashMap<>();
@@ -552,7 +551,6 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
updateIdealRuleMap(getClusterConfig());
updateDisabledInstances(getInstanceConfigMap().values(),
- getAssignableInstanceConfigMap().values(),
getClusterConfig());
return refreshedTypes;
@@ -591,7 +589,6 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
updateInstanceSets(_allInstanceConfigCache.getPropertyMap(),
_allLiveInstanceCache.getPropertyMap(),
_clusterConfig);
updateDisabledInstances(getInstanceConfigMap().values(),
- getAssignableInstanceConfigMap().values(),
_clusterConfig);
}
@@ -700,7 +697,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
*/
public Set<String> getEnabledLiveInstances() {
Set<String> enabledLiveInstances = new
HashSet<>(getLiveInstances().keySet());
- enabledLiveInstances.removeAll(getAssignableDisabledInstances());
+ enabledLiveInstances.removeAll(getDisabledInstances());
return enabledLiveInstances;
}
@@ -723,7 +720,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
*/
public Set<String> getEnabledInstances() {
Set<String> enabledNodes = new HashSet<>(getAllInstances());
- enabledNodes.removeAll(getAssignableDisabledInstances());
+ enabledNodes.removeAll(getDisabledInstances());
return enabledNodes;
}
@@ -818,15 +815,6 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
return Collections.unmodifiableSet(_disabledInstanceSet);
}
- /**
- * This method allows one to fetch the set of nodes that are disabled
- *
- * @return
- */
- public Set<String> getAssignableDisabledInstances() {
- return Collections.unmodifiableSet(_assignableDisabledInstanceSet);
- }
-
/**
* Get all swapping instance pairs.
*
@@ -991,7 +979,6 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
updateInstanceSets(_allInstanceConfigCache.getPropertyMap(),
_allLiveInstanceCache.getPropertyMap(),
getClusterConfig());
updateDisabledInstances(getInstanceConfigMap().values(),
- getAssignableInstanceConfigMap().values(),
getClusterConfig());
}
@@ -1082,7 +1069,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
}
private void updateDisabledInstances(Collection<InstanceConfig>
allInstanceConfigs,
- Collection<InstanceConfig> assignableInstanceConfigs, ClusterConfig
clusterConfig) {
+ ClusterConfig clusterConfig) {
// Move the calculating disabled instances to refresh
_disabledInstanceForPartitionMap.clear();
_disabledInstanceSet.clear();
@@ -1090,9 +1077,6 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
Map<String, List<String>> disabledPartitionMap =
config.getDisabledPartitionsMap();
if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
_disabledInstanceSet.add(config.getInstanceName());
- if (assignableInstanceConfigs.contains(config)) {
- _assignableDisabledInstanceSet.add(config.getInstanceName());
- }
}
for (String resource : disabledPartitionMap.keySet()) {
_disabledInstanceForPartitionMap.putIfAbsent(resource, new
HashMap<>());
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 0a532944a..07056aa44 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -52,6 +52,7 @@ public class AssignmentMetadataStore {
protected volatile Map<String, ResourceAssignment> _globalBaseline;
protected volatile Map<String, ResourceAssignment> _bestPossibleAssignment;
protected volatile int _bestPossibleVersion = 0;
+ protected volatile int _lastPersistedBestPossibleVersion = 0;
AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
@@ -76,6 +77,16 @@ public class AssignmentMetadataStore {
return _globalBaseline;
}
+ /**
+ * Check to see if the latest persisted version of best possible assignment
in the cache has been
+ * persisted to metadata store.
+ *
+ * @return true if the latest version has been persisted, false otherwise.
+ */
+ protected boolean hasPersistedLatestBestPossibleAssignment() {
+ return _lastPersistedBestPossibleVersion == _bestPossibleVersion;
+ }
+
public Map<String, ResourceAssignment> getBestPossibleAssignment() {
// Return the in-memory baseline. If null, read from ZK. This is to
minimize reads from ZK
if (_bestPossibleAssignment == null) {
@@ -141,6 +152,7 @@ public class AssignmentMetadataStore {
getBestPossibleAssignment().clear();
getBestPossibleAssignment().putAll(bestPossibleAssignment);
_bestPossibleVersion++;
+ _lastPersistedBestPossibleVersion = _bestPossibleVersion;
}
/**
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
index 74982f6e3..d4f492d93 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
@@ -177,6 +177,9 @@ class PartialRebalanceRunner implements AutoCloseable {
boolean bestPossibleUpdateSuccessful = false;
if (_assignmentMetadataStore != null &&
_assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
+ // This will not persist the new Best Possible Assignment into ZK. It
will only update the in-memory cache.
+ // If this is done successfully, the new Best Possible Assignment will
be persisted into ZK the next time that
+ // the pipeline is triggered. We schedule the pipeline to run below.
bestPossibleUpdateSuccessful =
_assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
newBestPossibleAssignmentVersion);
} else {
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index c3049ebbf..4c4f9cf94 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -572,7 +572,13 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
private void persistBestPossibleAssignment(Map<String, ResourceAssignment>
bestPossibleAssignment)
throws HelixRebalanceException {
- if (_assignmentMetadataStore != null &&
_assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)) {
+ // It is only persisted if the assignment is different from the currently
cached assignment or the
+ // matching version of this assignment is not yet persisted. Partial
Rebalance will not directly persist
+ // the assignment to the metadata store, it will only be cached. Instead,
it will be persisted by the
+ // main thread on the next pipeline run, hence the check
isBestPossibleChanged will be false.
+ if (_assignmentMetadataStore != null && (
+ _assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)
+ ||
!_assignmentMetadataStore.hasPersistedLatestBestPossibleAssignment())) {
try {
_writeLatency.startMeasuringLatency();
_assignmentMetadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
new file mode 100644
index 000000000..87e490d62
--- /dev/null
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
@@ -0,0 +1,356 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfig;
+import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestPartitionAssignmentAPI extends AbstractTestClass {
+ private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
+
+ private static final int REPLICAS = 3;
+ private static final int MIN_ACTIVE_REPLICAS = 2;
+ private static final String INSTANCE_CAPACITY_KEY = "PARTCOUNT";
+ private static final int DEFAULT_INSTANCE_COUNT = 4;
+ private static final int DEFAULT_INSTANCE_CAPACITY = 50;
+
+ private static final String INSTANCE_NAME_PREFIX = "localhost_";
+ private static final int INSTANCE_START_PORT = 12918;
+ private static final String PARTITION_ASSIGNMENT_PATH_TEMPLATE =
+ "/clusters/%s/partitionAssignment/";
+ private static final String CLUSTER_NAME = "PartitionAssignmentTestCluster";
+ private static ClusterControllerManager _controller;
+ private static HelixDataAccessor _helixDataAccessor;
+ private static ConfigAccessor _configAccessor;
+ private static BestPossibleExternalViewVerifier _clusterVerifier;
+ private static List<MockParticipantManager> _participants = new
ArrayList<>();
+ private static List<String> _resources = new ArrayList<>();
+
+ @BeforeMethod
+ public void beforeTest() {
+ System.out.println("Start setup:" + TestHelper.getTestMethodName());
+ // Create test cluster
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ // Setup cluster configs
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setPersistBestPossibleAssignment(true);
+ clusterConfig.setDefaultInstanceCapacityMap(
+ Collections.singletonMap(INSTANCE_CAPACITY_KEY,
DEFAULT_INSTANCE_CAPACITY));
+ clusterConfig.setInstanceCapacityKeys(List.of(INSTANCE_CAPACITY_KEY));
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ _controller = startController(CLUSTER_NAME);
+
+ // Create HelixDataAccessor
+ _helixDataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+
+ // Create cluster verifier
+ _clusterVerifier = new
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(new HashSet<>(_resources))
+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Add and start instances to cluster
+ for (int i = 0; i < DEFAULT_INSTANCE_COUNT; i++) {
+ String instanceName = INSTANCE_NAME_PREFIX + (INSTANCE_START_PORT + i);
+ InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+ instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceCapacityMap(
+ Collections.singletonMap(INSTANCE_CAPACITY_KEY,
DEFAULT_INSTANCE_CAPACITY));
+ _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME,
instanceConfig);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ System.out.println("End setup:" + TestHelper.getTestMethodName());
+ }
+
+ @AfterMethod
+ public void afterTest() throws Exception {
+ System.out.println("Start teardown:" + TestHelper.getTestMethodName());
+
+ // Drop all resources
+ for (String resource : _resources) {
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, resource);
+ }
+ _resources.clear();
+
+ // Stop and remove all instances
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
+ InstanceConfig instanceConfig = _helixDataAccessor.getProperty(
+
_helixDataAccessor.keyBuilder().instanceConfig(participant.getInstanceName()));
+ if (instanceConfig != null) {
+ _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME,
instanceConfig);
+ }
+ }
+ _participants.clear();
+
+ // Stop controller
+ _controller.syncStop();
+
+ // Drop cluster
+ _gSetupTool.deleteCluster(CLUSTER_NAME);
+
+ System.out.println("End teardown:" + TestHelper.getTestMethodName());
+ }
+
+ @Test
+ public void testComputePartitionAssignmentAddInstance() throws Exception {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ // Create 5 WAGED resources
+ String wagedResourcePrefix = "TEST_WAGED_DB_";
+ int resourceCount = 5;
+ for (int i = 0; i < resourceCount; i++) {
+ createWagedResource(wagedResourcePrefix + i,
+ DEFAULT_INSTANCE_CAPACITY * DEFAULT_INSTANCE_COUNT / REPLICAS /
resourceCount,
+ MIN_ACTIVE_REPLICAS, 100000L);
+ }
+
+ // Add Instance to cluster as disabled
+ String toAddInstanceName = "dummyInstance";
+ InstanceConfig toAddInstanceConfig = new InstanceConfig(toAddInstanceName);
+ toAddInstanceConfig.setInstanceCapacityMap(
+ Collections.singletonMap(INSTANCE_CAPACITY_KEY,
DEFAULT_INSTANCE_CAPACITY));
+ _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME,
toAddInstanceConfig);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Use partition assignment API to get CurrentStateView for just that
instance as active
+ String payload = "{\"InstanceChange\" : { \"ActivateInstances\" : [\"" +
toAddInstanceName
+ + "\"] }, \"Options\" : { \"ReturnFormat\" : \"CurrentStateFormat\",
\"InstanceFilter\" : [\""
+ + toAddInstanceName + "\"] }}";
+ Response response = post(getPartitionAssignmentPath(), null,
+ Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode(),
+ true);
+ String body = response.readEntity(String.class);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments =
+ OBJECT_MAPPER.readValue(body,
+ new TypeReference<HashMap<String, Map<String, Map<String,
String>>>>() {
+ });
+
+ // Actually create the live instance
+ MockParticipantManager toAddParticipant =
createParticipant(toAddInstanceName);
+ toAddParticipant.syncStart();
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Get the current state of the instance
+ // Compare the current state of the instance to CS previously returned by
partition assignment API
+ LiveInstance liveInstance = _helixDataAccessor.getProperty(
+ _helixDataAccessor.keyBuilder().liveInstance(toAddInstanceName));
+ String liveSession = liveInstance.getEphemeralOwner();
+ Assert.assertTrue(TestHelper.verify(() -> {
+ try {
+ Map<String, Set<String>> instanceResourceMap =
+ resourceAssignments.getOrDefault(toAddInstanceName,
Collections.emptyMap()).entrySet()
+ .stream().collect(HashMap::new,
+ (resourceMap, entry) -> resourceMap.put(entry.getKey(),
entry.getValue().keySet()),
+ HashMap::putAll);
+ Map<String, Set<String>> instanceResourceMapCurrentState = new
HashMap<>();
+ for (String resource : _resources) {
+ CurrentState currentState =
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder()
+ .currentState(toAddInstanceName, liveSession, resource));
+ instanceResourceMapCurrentState.put(resource,
+ currentState != null ?
currentState.getPartitionStateMap().keySet()
+ : Collections.emptySet());
+ }
+ Assert.assertEquals(instanceResourceMapCurrentState,
instanceResourceMap);
+ } catch (AssertionError e) {
+ LOG.error("Current state does not match partition assignment", e);
+ return false;
+ }
+ return true;
+ }, 30000));
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test
+ public void testComputePartitionAssignmentReplaceInstance() throws Exception
{
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ // Create 5 WAGED resources
+ String wagedResourcePrefix = "TEST_WAGED_DB_";
+ int resourceCount = 5;
+ for (int i = 0; i < resourceCount; i++) {
+ createWagedResource(wagedResourcePrefix + i,
+ (DEFAULT_INSTANCE_CAPACITY * 2 / 3) * (DEFAULT_INSTANCE_COUNT - 1) /
REPLICAS / resourceCount,
+ MIN_ACTIVE_REPLICAS, 1000L);
+ }
+
+ // Kill an instance to simulate a dead instance and wait longer than delay
window to simulate delayed rebalance
+ MockParticipantManager deadParticipant = _participants.get(0);
+ deadParticipant.syncStop();
+ Thread.sleep(3000L);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Enter the cluster into MM
+ _gSetupTool.getClusterManagementTool()
+ .enableMaintenanceMode(CLUSTER_NAME, true, "BHP enters MM.");
+
+ // Drop the dead instance from the cluster
+ _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME,
+ _gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME,
deadParticipant.getInstanceName()));
+
+ // Add new instance to cluster
+ String toAddInstanceName = "dummyInstance";
+ InstanceConfig toAddInstanceConfig = new InstanceConfig(toAddInstanceName);
+ toAddInstanceConfig.setInstanceCapacityMap(
+ Collections.singletonMap(INSTANCE_CAPACITY_KEY,
DEFAULT_INSTANCE_CAPACITY));
+ _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME,
toAddInstanceConfig);
+
+ // Disable the added instance
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
toAddInstanceName, false);
+
+ // Start the added instance
+ MockParticipantManager toAddParticipant =
createParticipant(toAddInstanceName);
+ toAddParticipant.syncStart();
+
+ // Exit the cluster from MM
+ _gSetupTool.getClusterManagementTool()
+ .enableMaintenanceMode(CLUSTER_NAME, false, "BHP exits MM.");
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Use partition assignment API to get CurrentStateView for just that
instance as active
+ String payload = "{\"InstanceChange\" : { \"ActivateInstances\" : [\"" +
toAddInstanceName
+ + "\"] }, \"Options\" : { \"ReturnFormat\" : \"CurrentStateFormat\",
\"InstanceFilter\" : [\""
+ + toAddInstanceName + "\"] }}";
+ Response response = post(getPartitionAssignmentPath(), null,
+ Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode(),
+ true);
+ String body = response.readEntity(String.class);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments =
+ OBJECT_MAPPER.readValue(body,
+ new TypeReference<HashMap<String, Map<String, Map<String,
String>>>>() {
+ });
+
+ // Simulate workflow doing work based off of partition assignment API
result
+ Thread.sleep(5000L);
+
+ // Enable the instance
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
toAddInstanceName, true);
+
+ // Wait for the cluster to converge
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Get the current state of the instance
+ // Compare the current state of the instance to CS previously returned by
partition assignment API
+ LiveInstance liveInstance = _helixDataAccessor.getProperty(
+ _helixDataAccessor.keyBuilder().liveInstance(toAddInstanceName));
+ String liveSession = liveInstance.getEphemeralOwner();
+ Assert.assertTrue(TestHelper.verify(() -> {
+ try {
+ Map<String, Set<String>> instanceResourceMap =
+ resourceAssignments.getOrDefault(toAddInstanceName,
Collections.emptyMap()).entrySet()
+ .stream().collect(HashMap::new,
+ (resourceMap, entry) -> resourceMap.put(entry.getKey(),
entry.getValue().keySet()),
+ HashMap::putAll);
+ Map<String, Set<String>> instanceResourceMapCurrentState = new
HashMap<>();
+ for (String resource : _resources) {
+ CurrentState currentState =
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder()
+ .currentState(toAddInstanceName, liveSession, resource));
+ instanceResourceMapCurrentState.put(resource,
+ currentState != null ?
currentState.getPartitionStateMap().keySet()
+ : Collections.emptySet());
+ }
+ Assert.assertEquals(instanceResourceMapCurrentState,
instanceResourceMap);
+ } catch (AssertionError e) {
+ LOG.error("Current state does not match partition assignment", e);
+ return false;
+ }
+ return true;
+ }, 30000));
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ private String getPartitionAssignmentPath() {
+ return String.format(PARTITION_ASSIGNMENT_PATH_TEMPLATE, CLUSTER_NAME);
+ }
+
+ private MockParticipantManager createParticipant(String instanceName) {
+ MockParticipantManager toAddParticipant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants.add(toAddParticipant);
+ return toAddParticipant;
+ }
+
+ private void createWagedResource(String db, int numPartition, int
minActiveReplica, long delay)
+ throws IOException {
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, numPartition,
"LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO + "", null);
+ _resources.add(db);
+
+ IdealState idealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ idealState.setMinActiveReplicas(minActiveReplica);
+ idealState.setDelayRebalanceEnabled(true);
+ idealState.setRebalanceDelay(delay);
+ idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
db, idealState);
+
+ ResourceConfig resourceConfig = new ResourceConfig(db);
+ Map<String, Map<String, Integer>> capacityMap = new HashMap<>();
+ capacityMap.put("DEFAULT", Collections.singletonMap(INSTANCE_CAPACITY_KEY,
1));
+ resourceConfig.setPartitionCapacityMap(capacityMap);
+ _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+
+ _gSetupTool.rebalanceResource(CLUSTER_NAME, db, REPLICAS);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+}