This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cd22a333 Fix /partitionAssignmentAPI and WAGED rebalancer 
finalMapping results matching (#2739)
3cd22a333 is described below

commit 3cd22a33332f277e9e68c7341a0a89f54ff46aee
Author: Zachary Pinto <[email protected]>
AuthorDate: Tue Jan 23 10:15:23 2024 -0800

    Fix /partitionAssignmentAPI and WAGED rebalancer finalMapping results 
matching (#2739)
    
    OfflineTimeMap was not being cleared by the cache and the 
BEST_POSSIBLE_STATE was not being persisted to ZK each time a new version was 
being computed by partial rebalance. Fixing these two issues ensures that the 
pipeline produces the correct finalMapping and the partitionAssignmentAPI is 
not using a stale BEST_POSSIBLE_STATE to compute the simulated finalMapping.
---
 .../dataproviders/BaseControllerDataProvider.java  |  22 +-
 .../rebalancer/waged/AssignmentMetadataStore.java  |  12 +
 .../rebalancer/waged/PartialRebalanceRunner.java   |   3 +
 .../rebalancer/waged/WagedRebalancer.java          |   8 +-
 .../rest/server/TestPartitionAssignmentAPI.java    | 356 +++++++++++++++++++++
 5 files changed, 381 insertions(+), 20 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 9120bd962..6fe1c7fc3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -123,7 +123,6 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   // This is used for SWAP related operations where there can be two instances 
with the same logicalId.
   private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new 
HashMap<>();
   private final Map<String, LiveInstance> _assignableLiveInstancesMap = new 
HashMap<>();
-  private final Set<String> _assignableDisabledInstanceSet = new HashSet<>();
   private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = 
new HashMap<>();
   private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
   private final Map<String, MonitoredAbnormalResolver> 
_abnormalStateResolverMap = new HashMap<>();
@@ -552,7 +551,6 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
 
     updateIdealRuleMap(getClusterConfig());
     updateDisabledInstances(getInstanceConfigMap().values(),
-        getAssignableInstanceConfigMap().values(),
         getClusterConfig());
 
     return refreshedTypes;
@@ -591,7 +589,6 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), 
_allLiveInstanceCache.getPropertyMap(),
         _clusterConfig);
     updateDisabledInstances(getInstanceConfigMap().values(),
-        getAssignableInstanceConfigMap().values(),
         _clusterConfig);
   }
 
@@ -700,7 +697,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    */
   public Set<String> getEnabledLiveInstances() {
     Set<String> enabledLiveInstances = new 
HashSet<>(getLiveInstances().keySet());
-    enabledLiveInstances.removeAll(getAssignableDisabledInstances());
+    enabledLiveInstances.removeAll(getDisabledInstances());
 
     return enabledLiveInstances;
   }
@@ -723,7 +720,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    */
   public Set<String> getEnabledInstances() {
     Set<String> enabledNodes = new HashSet<>(getAllInstances());
-    enabledNodes.removeAll(getAssignableDisabledInstances());
+    enabledNodes.removeAll(getDisabledInstances());
 
     return enabledNodes;
   }
@@ -818,15 +815,6 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     return Collections.unmodifiableSet(_disabledInstanceSet);
   }
 
-  /**
-   * This method allows one to fetch the set of nodes that are disabled
-   *
-   * @return
-   */
-  public Set<String> getAssignableDisabledInstances() {
-    return Collections.unmodifiableSet(_assignableDisabledInstanceSet);
-  }
-
   /**
    * Get all swapping instance pairs.
    *
@@ -991,7 +979,6 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), 
_allLiveInstanceCache.getPropertyMap(),
         getClusterConfig());
     updateDisabledInstances(getInstanceConfigMap().values(),
-        getAssignableInstanceConfigMap().values(),
         getClusterConfig());
   }
 
@@ -1082,7 +1069,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   }
 
   private void updateDisabledInstances(Collection<InstanceConfig> 
allInstanceConfigs,
-      Collection<InstanceConfig> assignableInstanceConfigs, ClusterConfig 
clusterConfig) {
+      ClusterConfig clusterConfig) {
     // Move the calculating disabled instances to refresh
     _disabledInstanceForPartitionMap.clear();
     _disabledInstanceSet.clear();
@@ -1090,9 +1077,6 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
       Map<String, List<String>> disabledPartitionMap = 
config.getDisabledPartitionsMap();
       if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
         _disabledInstanceSet.add(config.getInstanceName());
-        if (assignableInstanceConfigs.contains(config)) {
-          _assignableDisabledInstanceSet.add(config.getInstanceName());
-        }
       }
       for (String resource : disabledPartitionMap.keySet()) {
         _disabledInstanceForPartitionMap.putIfAbsent(resource, new 
HashMap<>());
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 0a532944a..07056aa44 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -52,6 +52,7 @@ public class AssignmentMetadataStore {
   protected volatile Map<String, ResourceAssignment> _globalBaseline;
   protected volatile Map<String, ResourceAssignment> _bestPossibleAssignment;
   protected volatile int _bestPossibleVersion = 0;
+  protected volatile int _lastPersistedBestPossibleVersion = 0;
 
   AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
     this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
@@ -76,6 +77,16 @@ public class AssignmentMetadataStore {
     return _globalBaseline;
   }
 
+  /**
+   * Check to see if the latest persisted version of best possible assignment 
in the cache has been
+   * persisted to metadata store.
+   *
+   * @return true if the latest version has been persisted, false otherwise.
+   */
+  protected boolean hasPersistedLatestBestPossibleAssignment() {
+    return _lastPersistedBestPossibleVersion == _bestPossibleVersion;
+  }
+
   public Map<String, ResourceAssignment> getBestPossibleAssignment() {
     // Return the in-memory baseline. If null, read from ZK. This is to 
minimize reads from ZK
     if (_bestPossibleAssignment == null) {
@@ -141,6 +152,7 @@ public class AssignmentMetadataStore {
     getBestPossibleAssignment().clear();
     getBestPossibleAssignment().putAll(bestPossibleAssignment);
     _bestPossibleVersion++;
+    _lastPersistedBestPossibleVersion = _bestPossibleVersion;
   }
 
   /**
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
index 74982f6e3..d4f492d93 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
@@ -177,6 +177,9 @@ class PartialRebalanceRunner implements AutoCloseable {
 
     boolean bestPossibleUpdateSuccessful = false;
     if (_assignmentMetadataStore != null && 
_assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
+      // This will not persist the new Best Possible Assignment into ZK. It 
will only update the in-memory cache.
+      // If this is done successfully, the new Best Possible Assignment will 
be persisted into ZK the next time that
+      // the pipeline is triggered. We schedule the pipeline to run below.
       bestPossibleUpdateSuccessful = 
_assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
           newBestPossibleAssignmentVersion);
     } else {
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index c3049ebbf..4c4f9cf94 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -572,7 +572,13 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
 
   private void persistBestPossibleAssignment(Map<String, ResourceAssignment> 
bestPossibleAssignment)
       throws HelixRebalanceException {
-    if (_assignmentMetadataStore != null && 
_assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)) {
+    // It is only persisted if the assignment is different from the currently 
cached assignment or the
+    // matching version of this assignment is not yet persisted. Partial 
Rebalance will not directly persist
+    // the assignment to the metadata store, it will only be cached. Instead, 
it will be persisted by the
+    // main thread on the next pipeline run, hence the check 
isBestPossibleChanged will be false.
+    if (_assignmentMetadataStore != null && (
+        _assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)
+            || 
!_assignmentMetadataStore.hasPersistedLatestBestPossibleAssignment())) {
       try {
         _writeLatency.startMeasuringLatency();
         
_assignmentMetadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
new file mode 100644
index 000000000..87e490d62
--- /dev/null
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
@@ -0,0 +1,356 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfig;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestPartitionAssignmentAPI extends AbstractTestClass {
+  private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
+
+  private static final int REPLICAS = 3;
+  private static final int MIN_ACTIVE_REPLICAS = 2;
+  private static final String INSTANCE_CAPACITY_KEY = "PARTCOUNT";
+  private static final int DEFAULT_INSTANCE_COUNT = 4;
+  private static final int DEFAULT_INSTANCE_CAPACITY = 50;
+
+  private static final String INSTANCE_NAME_PREFIX = "localhost_";
+  private static final int INSTANCE_START_PORT = 12918;
+  private static final String PARTITION_ASSIGNMENT_PATH_TEMPLATE =
+      "/clusters/%s/partitionAssignment/";
+  private static final String CLUSTER_NAME = "PartitionAssignmentTestCluster";
+  private static ClusterControllerManager _controller;
+  private static HelixDataAccessor _helixDataAccessor;
+  private static ConfigAccessor _configAccessor;
+  private static BestPossibleExternalViewVerifier _clusterVerifier;
+  private static List<MockParticipantManager> _participants = new 
ArrayList<>();
+  private static List<String> _resources = new ArrayList<>();
+
+  @BeforeMethod
+  public void beforeTest() {
+    System.out.println("Start setup:" + TestHelper.getTestMethodName());
+    // Create test cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    // Setup cluster configs
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setPersistBestPossibleAssignment(true);
+    clusterConfig.setDefaultInstanceCapacityMap(
+        Collections.singletonMap(INSTANCE_CAPACITY_KEY, 
DEFAULT_INSTANCE_CAPACITY));
+    clusterConfig.setInstanceCapacityKeys(List.of(INSTANCE_CAPACITY_KEY));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    _controller = startController(CLUSTER_NAME);
+
+    // Create HelixDataAccessor
+    _helixDataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+
+    // Create cluster verifier
+    _clusterVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(new HashSet<>(_resources))
+        
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Add and start instances to cluster
+    for (int i = 0; i < DEFAULT_INSTANCE_COUNT; i++) {
+      String instanceName = INSTANCE_NAME_PREFIX + (INSTANCE_START_PORT + i);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.setInstanceCapacityMap(
+          Collections.singletonMap(INSTANCE_CAPACITY_KEY, 
DEFAULT_INSTANCE_CAPACITY));
+      _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, 
instanceConfig);
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    System.out.println("End setup:" + TestHelper.getTestMethodName());
+  }
+
+  @AfterMethod
+  public void afterTest() throws Exception {
+    System.out.println("Start teardown:" + TestHelper.getTestMethodName());
+
+    // Drop all resources
+    for (String resource : _resources) {
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, resource);
+    }
+    _resources.clear();
+
+    // Stop and remove all instances
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+      InstanceConfig instanceConfig = _helixDataAccessor.getProperty(
+          
_helixDataAccessor.keyBuilder().instanceConfig(participant.getInstanceName()));
+      if (instanceConfig != null) {
+        _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, 
instanceConfig);
+      }
+    }
+    _participants.clear();
+
+    // Stop controller
+    _controller.syncStop();
+
+    // Drop cluster
+    _gSetupTool.deleteCluster(CLUSTER_NAME);
+
+    System.out.println("End teardown:" + TestHelper.getTestMethodName());
+  }
+
+  @Test
+  public void testComputePartitionAssignmentAddInstance() throws Exception {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // Create 5 WAGED resources
+    String wagedResourcePrefix = "TEST_WAGED_DB_";
+    int resourceCount = 5;
+    for (int i = 0; i < resourceCount; i++) {
+      createWagedResource(wagedResourcePrefix + i,
+          DEFAULT_INSTANCE_CAPACITY * DEFAULT_INSTANCE_COUNT / REPLICAS / 
resourceCount,
+          MIN_ACTIVE_REPLICAS, 100000L);
+    }
+
+    // Add Instance to cluster as disabled
+    String toAddInstanceName = "dummyInstance";
+    InstanceConfig toAddInstanceConfig = new InstanceConfig(toAddInstanceName);
+    toAddInstanceConfig.setInstanceCapacityMap(
+        Collections.singletonMap(INSTANCE_CAPACITY_KEY, 
DEFAULT_INSTANCE_CAPACITY));
+    _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, 
toAddInstanceConfig);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Use partition assignment API to get CurrentStateView for just that 
instance as active
+    String payload = "{\"InstanceChange\" : { \"ActivateInstances\" : [\"" + 
toAddInstanceName
+        + "\"] }, \"Options\" : { \"ReturnFormat\" : \"CurrentStateFormat\", 
\"InstanceFilter\" : [\""
+        + toAddInstanceName + "\"] }}";
+    Response response = post(getPartitionAssignmentPath(), null,
+        Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE), 
Response.Status.OK.getStatusCode(),
+        true);
+    String body = response.readEntity(String.class);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments =
+        OBJECT_MAPPER.readValue(body,
+            new TypeReference<HashMap<String, Map<String, Map<String, 
String>>>>() {
+            });
+
+    // Actually create the live instance
+    MockParticipantManager toAddParticipant = 
createParticipant(toAddInstanceName);
+    toAddParticipant.syncStart();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Get the current state of the instance
+    // Compare the current state of the instance to CS previously returned by 
partition assignment API
+    LiveInstance liveInstance = _helixDataAccessor.getProperty(
+        _helixDataAccessor.keyBuilder().liveInstance(toAddInstanceName));
+    String liveSession = liveInstance.getEphemeralOwner();
+    Assert.assertTrue(TestHelper.verify(() -> {
+      try {
+        Map<String, Set<String>> instanceResourceMap =
+            resourceAssignments.getOrDefault(toAddInstanceName, 
Collections.emptyMap()).entrySet()
+                .stream().collect(HashMap::new,
+                    (resourceMap, entry) -> resourceMap.put(entry.getKey(), 
entry.getValue().keySet()),
+                    HashMap::putAll);
+        Map<String, Set<String>> instanceResourceMapCurrentState = new 
HashMap<>();
+        for (String resource : _resources) {
+          CurrentState currentState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder()
+              .currentState(toAddInstanceName, liveSession, resource));
+          instanceResourceMapCurrentState.put(resource,
+              currentState != null ? 
currentState.getPartitionStateMap().keySet()
+                  : Collections.emptySet());
+        }
+        Assert.assertEquals(instanceResourceMapCurrentState, 
instanceResourceMap);
+      } catch (AssertionError e) {
+        LOG.error("Current state does not match partition assignment", e);
+        return false;
+      }
+      return true;
+    }, 30000));
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test
+  public void testComputePartitionAssignmentReplaceInstance() throws Exception 
{
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // Create 5 WAGED resources
+    String wagedResourcePrefix = "TEST_WAGED_DB_";
+    int resourceCount = 5;
+    for (int i = 0; i < resourceCount; i++) {
+      createWagedResource(wagedResourcePrefix + i,
+          (DEFAULT_INSTANCE_CAPACITY * 2 / 3) * (DEFAULT_INSTANCE_COUNT - 1) / 
REPLICAS / resourceCount,
+          MIN_ACTIVE_REPLICAS, 1000L);
+    }
+
+    // Kill an instance to simulate a dead instance and wait longer than delay 
window to simulate delayed rebalance
+    MockParticipantManager deadParticipant = _participants.get(0);
+    deadParticipant.syncStop();
+    Thread.sleep(3000L);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Enter the cluster into MM
+    _gSetupTool.getClusterManagementTool()
+        .enableMaintenanceMode(CLUSTER_NAME, true, "BHP enters MM.");
+
+    // Drop the dead instance from the cluster
+    _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME,
+        _gSetupTool.getClusterManagementTool()
+            .getInstanceConfig(CLUSTER_NAME, 
deadParticipant.getInstanceName()));
+
+    // Add new instance to cluster
+    String toAddInstanceName = "dummyInstance";
+    InstanceConfig toAddInstanceConfig = new InstanceConfig(toAddInstanceName);
+    toAddInstanceConfig.setInstanceCapacityMap(
+        Collections.singletonMap(INSTANCE_CAPACITY_KEY, 
DEFAULT_INSTANCE_CAPACITY));
+    _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, 
toAddInstanceConfig);
+
+    // Disable the added instance
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
toAddInstanceName, false);
+
+    // Start the added instance
+    MockParticipantManager toAddParticipant = 
createParticipant(toAddInstanceName);
+    toAddParticipant.syncStart();
+
+    // Exit the cluster from MM
+    _gSetupTool.getClusterManagementTool()
+        .enableMaintenanceMode(CLUSTER_NAME, false, "BHP exits MM.");
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Use partition assignment API to get CurrentStateView for just that 
instance as active
+    String payload = "{\"InstanceChange\" : { \"ActivateInstances\" : [\"" + 
toAddInstanceName
+        + "\"] }, \"Options\" : { \"ReturnFormat\" : \"CurrentStateFormat\", 
\"InstanceFilter\" : [\""
+        + toAddInstanceName + "\"] }}";
+    Response response = post(getPartitionAssignmentPath(), null,
+        Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE), 
Response.Status.OK.getStatusCode(),
+        true);
+    String body = response.readEntity(String.class);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments =
+        OBJECT_MAPPER.readValue(body,
+            new TypeReference<HashMap<String, Map<String, Map<String, 
String>>>>() {
+            });
+
+    // Simulate workflow doing work based off of partition assignment API 
result
+    Thread.sleep(5000L);
+
+    // Enable the instance
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
toAddInstanceName, true);
+
+    // Wait for the cluster to converge
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Get the current state of the instance
+    // Compare the current state of the instance to CS previously returned by 
partition assignment API
+    LiveInstance liveInstance = _helixDataAccessor.getProperty(
+        _helixDataAccessor.keyBuilder().liveInstance(toAddInstanceName));
+    String liveSession = liveInstance.getEphemeralOwner();
+    Assert.assertTrue(TestHelper.verify(() -> {
+      try {
+        Map<String, Set<String>> instanceResourceMap =
+            resourceAssignments.getOrDefault(toAddInstanceName, 
Collections.emptyMap()).entrySet()
+                .stream().collect(HashMap::new,
+                    (resourceMap, entry) -> resourceMap.put(entry.getKey(), 
entry.getValue().keySet()),
+                    HashMap::putAll);
+        Map<String, Set<String>> instanceResourceMapCurrentState = new 
HashMap<>();
+        for (String resource : _resources) {
+          CurrentState currentState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder()
+              .currentState(toAddInstanceName, liveSession, resource));
+          instanceResourceMapCurrentState.put(resource,
+              currentState != null ? 
currentState.getPartitionStateMap().keySet()
+                  : Collections.emptySet());
+        }
+        Assert.assertEquals(instanceResourceMapCurrentState, 
instanceResourceMap);
+      } catch (AssertionError e) {
+        LOG.error("Current state does not match partition assignment", e);
+        return false;
+      }
+      return true;
+    }, 30000));
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  private String getPartitionAssignmentPath() {
+    return String.format(PARTITION_ASSIGNMENT_PATH_TEMPLATE, CLUSTER_NAME);
+  }
+
+  private MockParticipantManager createParticipant(String instanceName) {
+    MockParticipantManager toAddParticipant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+    _participants.add(toAddParticipant);
+    return toAddParticipant;
+  }
+
+  private void createWagedResource(String db, int numPartition, int 
minActiveReplica, long delay)
+      throws IOException {
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, numPartition, 
"LeaderStandby",
+        IdealState.RebalanceMode.FULL_AUTO + "", null);
+    _resources.add(db);
+
+    IdealState idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    idealState.setMinActiveReplicas(minActiveReplica);
+    idealState.setDelayRebalanceEnabled(true);
+    idealState.setRebalanceDelay(delay);
+    idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
db, idealState);
+
+    ResourceConfig resourceConfig = new ResourceConfig(db);
+    Map<String, Map<String, Integer>> capacityMap = new HashMap<>();
+    capacityMap.put("DEFAULT", Collections.singletonMap(INSTANCE_CAPACITY_KEY, 
1));
+    resourceConfig.setPartitionCapacityMap(capacityMap);
+    _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+
+    _gSetupTool.rebalanceResource(CLUSTER_NAME, db, REPLICAS);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+}

Reply via email to