This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3d2da32bddc7798a502ae170873f0fb1abe8cbd5
Author: Hunter Lee <[email protected]>
AuthorDate: Mon Sep 9 16:40:24 2019 -0700

    Implement AssignmentMetadataStore (#453)
    
    Implement AssignmentMetadataStore
    
    AssignmentMetadataStore is a component for the new WAGED Rebalaner. It 
provides APIs that allows the rebalancer to read and write the baseline and 
best possible assignments using BucketDataAccessor.
    
    Changelist:
    1. Add AssignmentMetadataStore
    2. Add an integration test: TestAssignmentMetadataStore
---
 .../rebalancer/waged/AssignmentMetadataStore.java  | 112 +++++++++++++++++++--
 .../rebalancer/waged/WagedRebalancer.java          |  64 ++++++------
 .../manager/zk/ZNRecordJacksonSerializer.java      |   4 +-
 .../waged/MockAssignmentMetadataStore.java         |   9 +-
 .../waged/TestAssignmentMetadataStore.java         | 101 +++++++++++++++++++
 5 files changed, 244 insertions(+), 46 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index cc52dac..bf9f292 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -19,6 +19,16 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import java.io.IOException;
+import java.util.Arrays;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.ResourceAssignment;
 
 import java.util.HashMap;
@@ -28,24 +38,106 @@ import java.util.Map;
  * A placeholder before we have the real assignment metadata store.
  */
 public class AssignmentMetadataStore {
-  private Map<String, ResourceAssignment> _persistGlobalBaseline = new 
HashMap<>();
-  private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new 
HashMap<>();
+  private static final String ASSIGNMENT_METADATA_KEY = "ASSIGNMENT_METADATA";
+  private static final String BASELINE_TEMPLATE = "/%s/%s/BASELINE";
+  private static final String BEST_POSSIBLE_TEMPLATE = "/%s/%s/BEST_POSSIBLE";
+  private static final String BASELINE_KEY = "BASELINE";
+  private static final String BEST_POSSIBLE_KEY = "BEST_POSSIBLE";
+  private static final ZkSerializer SERIALIZER = new 
ZNRecordJacksonSerializer();
+
+  private BucketDataAccessor _dataAccessor;
+  private String _baselinePath;
+  private String _bestPossiblePath;
+  private Map<String, ResourceAssignment> _globalBaseline;
+  private Map<String, ResourceAssignment> _bestPossibleAssignment;
+
+  AssignmentMetadataStore(HelixManager helixManager) {
+    _dataAccessor = new 
ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString());
+    _baselinePath =
+        String.format(BASELINE_TEMPLATE, helixManager.getClusterName(), 
ASSIGNMENT_METADATA_KEY);
+    _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, 
helixManager.getClusterName(),
+        ASSIGNMENT_METADATA_KEY);
+  }
 
   public Map<String, ResourceAssignment> getBaseline() {
-    return _persistGlobalBaseline;
+    // Return the in-memory baseline. If null, read from ZK. This is to 
minimize reads from ZK
+    if (_globalBaseline == null) {
+      HelixProperty baseline =
+          _dataAccessor.compressedBucketRead(_baselinePath, 
HelixProperty.class);
+      _globalBaseline = splitAssignments(baseline);
+    }
+    return _globalBaseline;
+  }
+
+  public Map<String, ResourceAssignment> getBestPossibleAssignment() {
+    // Return the in-memory baseline. If null, read from ZK. This is to 
minimize reads from ZK
+    if (_bestPossibleAssignment == null) {
+      HelixProperty baseline =
+          _dataAccessor.compressedBucketRead(_bestPossiblePath, 
HelixProperty.class);
+      _bestPossibleAssignment = splitAssignments(baseline);
+    }
+    return _bestPossibleAssignment;
   }
 
   public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
-    // TODO clean up invalid items
-    _persistGlobalBaseline = globalBaseline;
+    // TODO: Make the write async?
+    // Persist to ZK
+    HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, 
globalBaseline);
+    try {
+      _dataAccessor.compressedBucketWrite(_baselinePath, combinedAssignments);
+    } catch (IOException e) {
+      // TODO: Improve failure handling
+      throw new HelixException("Failed to persist baseline!", e);
+    }
+
+    // Update the in-memory reference
+    _globalBaseline = globalBaseline;
   }
 
-  public Map<String, ResourceAssignment> getBestPossibleAssignment() {
-    return _persistBestPossibleAssignment;
+  public void persistBestPossibleAssignment(
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // TODO: Make the write async?
+    // Persist to ZK asynchronously
+    HelixProperty combinedAssignments =
+        combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment);
+    try {
+      _dataAccessor.compressedBucketWrite(_bestPossiblePath, 
combinedAssignments);
+    } catch (IOException e) {
+      // TODO: Improve failure handling
+      throw new HelixException("Failed to persist baseline!", e);
+    }
+
+    // Update the in-memory reference
+    _bestPossibleAssignment = bestPossibleAssignment;
+  }
+
+  /**
+   * Produces one HelixProperty that contains all assignment data.
+   * @param name
+   * @param assignmentMap
+   * @return
+   */
+  private HelixProperty combineAssignments(String name,
+      Map<String, ResourceAssignment> assignmentMap) {
+    HelixProperty property = new HelixProperty(name);
+    // Add each resource's assignment as a simple field in one ZNRecord
+    assignmentMap.forEach((resource, assignment) -> 
property.getRecord().setSimpleField(resource,
+        Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
+    return property;
   }
 
-  public void persistBestPossibleAssignment(Map<String, ResourceAssignment> 
bestPossibleAssignment) {
-    // TODO clean up invalid items
-    _persistBestPossibleAssignment.putAll(bestPossibleAssignment);
+  /**
+   * Returns a Map of (ResourceName, ResourceAssignment) pairs.
+   * @param property
+   * @return
+   */
+  private Map<String, ResourceAssignment> splitAssignments(HelixProperty 
property) {
+    Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
+    // Convert each resource's assignment String into a ResourceAssignment 
object and put it in a
+    // map
+    property.getRecord().getSimpleFields()
+        .forEach((resource, assignment) -> assignmentMap.put(resource,
+            new ResourceAssignment((ZNRecord) 
SERIALIZER.deserialize(assignment.getBytes()))));
+    return assignmentMap;
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 866c7c9..22cac7e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -51,10 +51,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Weight-Aware Globally-Even Distribute Rebalancer.
- *
- * @see <a 
href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer";>
- * Design Document
- * </a>
+ * @see <a
+ *      
href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer";>
+ *      Design Document
+ *      </a>
  */
 public class WagedRebalancer {
   private static final Logger LOG = 
LoggerFactory.getLogger(WagedRebalancer.class);
@@ -62,8 +62,8 @@ public class WagedRebalancer {
   // When any of the following change happens, the rebalancer needs to do a 
global rebalance which
   // contains 1. baseline recalculate, 2. partial rebalance that is based on 
the new baseline.
   private static final Set<HelixConstants.ChangeType> 
GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
-      Collections.unmodifiableSet(new HashSet<>(Arrays
-          .asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
+      Collections
+          .unmodifiableSet(new 
HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
               HelixConstants.ChangeType.CLUSTER_CONFIG,
               HelixConstants.ChangeType.INSTANCE_CONFIG)));
   // The cluster change detector is a stateful object.
@@ -73,7 +73,8 @@ public class WagedRebalancer {
   private final MappingCalculator<ResourceControllerDataProvider> 
_mappingCalculator;
 
   // --------- The following fields are placeholders and need replacement. 
-----------//
-  // TODO Shall we make the metadata store a static threadlocal object as well 
to avoid reinitialization?
+  // TODO Shall we make the metadata store a static threadlocal object as well 
to avoid
+  // reinitialization?
   private final AssignmentMetadataStore _assignmentMetadataStore;
   private final RebalanceAlgorithm _rebalanceAlgorithm;
   // 
------------------------------------------------------------------------------------//
@@ -81,8 +82,8 @@ public class WagedRebalancer {
   public WagedRebalancer(HelixManager helixManager) {
     this(
         // TODO init the metadata store according to their requirement when 
integrate,
-        //  or change to final static method if possible.
-        new AssignmentMetadataStore(),
+        // or change to final static method if possible.
+        new AssignmentMetadataStore(helixManager),
         // TODO parse the cluster setting
         ConstraintBasedAlgorithmFactory.getInstance(),
         // Use DelayedAutoRebalancer as the mapping calculator for the final 
assignment output.
@@ -103,14 +104,14 @@ public class WagedRebalancer {
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm) {
     this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
+
   }
 
   /**
    * Compute the new IdealStates for all the input resources. The IdealStates 
include both new
    * partition assignment (in the listFiles) and the new replica state mapping 
(in the mapFields).
-   *
-   * @param clusterData        The Cluster status data provider.
-   * @param resourceMap        A map containing all the rebalancing resources.
+   * @param clusterData The Cluster status data provider.
+   * @param resourceMap A map containing all the rebalancing resources.
    * @param currentStateOutput The present Current States of the resources.
    * @return A map of the new IdealStates with the resource name as key.
    */
@@ -124,8 +125,8 @@ public class WagedRebalancer {
       IdealState is = clusterData.getIdealState(resourceEntry.getKey());
       return is != null && 
is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
           && getClass().getName().equals(is.getRebalancerClassName());
-    }).collect(Collectors
-        .toMap(resourceEntry -> resourceEntry.getKey(), resourceEntry -> 
resourceEntry.getValue()));
+    }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
+        resourceEntry -> resourceEntry.getValue()));
 
     if (resourceMap.isEmpty()) {
       LOG.warn("There is no valid resource to be rebalanced by {}",
@@ -140,13 +141,13 @@ public class WagedRebalancer {
     Map<String, IdealState> newIdealStates = 
computeBestPossibleStates(clusterData, resourceMap);
 
     // Construct the new best possible states according to the current state 
and target assignment.
-    // Note that the new ideal state might be an intermediate state between 
the current state and the target assignment.
+    // Note that the new ideal state might be an intermediate state between 
the current state and
+    // the target assignment.
     for (IdealState is : newIdealStates.values()) {
       String resourceName = is.getResourceName();
       // Adjust the states according to the current state.
-      ResourceAssignment finalAssignment = _mappingCalculator
-          .computeBestPossiblePartitionState(clusterData, is, 
resourceMap.get(resourceName),
-              currentStateOutput);
+      ResourceAssignment finalAssignment = 
_mappingCalculator.computeBestPossiblePartitionState(
+          clusterData, is, resourceMap.get(resourceName), currentStateOutput);
 
       // Clean up the state mapping fields. Use the final assignment that is 
calculated by the
       // mapping calculator to replace them.
@@ -195,10 +196,10 @@ public class WagedRebalancer {
       IdealState newIdeaState;
       try {
         IdealState currentIdealState = clusterData.getIdealState(resourceName);
-        Map<String, Integer> statePriorityMap =
-            
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
-                .getStatePriorityMap();
-        // Create a new IdealState instance contains the new calculated 
assignment in the preference list.
+        Map<String, Integer> statePriorityMap = clusterData
+            
.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+        // Create a new IdealState instance contains the new calculated 
assignment in the preference
+        // list.
         newIdeaState = generateIdealStateWithAssignment(resourceName, 
currentIdealState,
             newAssignment.get(resourceName), statePriorityMap);
       } catch (Exception ex) {
@@ -227,9 +228,8 @@ public class WagedRebalancer {
       throw new HelixRebalanceException("Failed to get the current baseline 
assignment.",
           HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
     }
-    Map<String, ResourceAssignment> baseline =
-        calculateAssignment(clusterData, clusterChanges, resourceMap, 
clusterData.getAllInstances(),
-            Collections.emptyMap(), currentBaseline);
+    Map<String, ResourceAssignment> baseline = 
calculateAssignment(clusterData, clusterChanges,
+        resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), 
currentBaseline);
     try {
       _assignmentMetadataStore.persistBaseline(baseline);
     } catch (Exception ex) {
@@ -254,9 +254,8 @@ public class WagedRebalancer {
       throw new HelixRebalanceException("Failed to get the persisted 
assignment records.",
           HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
     }
-    Map<String, ResourceAssignment> newAssignment =
-        calculateAssignment(clusterData, clusterChanges, resourceMap, 
activeInstances, baseline,
-            prevBestPossibleAssignment);
+    Map<String, ResourceAssignment> newAssignment = 
calculateAssignment(clusterData, clusterChanges,
+        resourceMap, activeInstances, baseline, prevBestPossibleAssignment);
     try {
       // TODO Test to confirm if persisting the final assignment (with final 
partition states)
       // would be a better option.
@@ -271,13 +270,13 @@ public class WagedRebalancer {
 
   /**
    * Generate the cluster model based on the input and calculate the optimal 
assignment.
-   *
    * @param clusterData                the cluster data cache.
    * @param clusterChanges             the detected cluster changes.
    * @param resourceMap                the rebalancing resources.
    * @param activeNodes                the alive and enabled nodes.
    * @param baseline                   the baseline assignment for the 
algorithm as a reference.
-   * @param prevBestPossibleAssignment the previous best possible assignment 
for the algorithm as a reference.
+   * @param prevBestPossibleAssignment the previous best possible assignment 
for the algorithm as a
+   *                                   reference.
    * @return the new optimal assignment for the resources.
    */
   private Map<String, ResourceAssignment> calculateAssignment(
@@ -289,9 +288,8 @@ public class WagedRebalancer {
     LOG.info("Start calculating for an assignment");
     ClusterModel clusterModel;
     try {
-      clusterModel = ClusterModelProvider
-          .generateClusterModel(clusterData, resourceMap, activeNodes, 
clusterChanges, baseline,
-              prevBestPossibleAssignment);
+      clusterModel = ClusterModelProvider.generateClusterModel(clusterData, 
resourceMap,
+          activeNodes, clusterChanges, baseline, prevBestPossibleAssignment);
     } catch (Exception ex) {
       throw new HelixRebalanceException("Failed to generate cluster model.",
           HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
index 989017a..b375e80 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
@@ -27,8 +27,8 @@ import org.apache.helix.ZNRecord;
 import org.codehaus.jackson.map.ObjectMapper;
 
 /**
- * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array 
using MessagePack's
- * serializer. Note that this serializer doesn't check for the size of the 
resulting binary.
+ * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array 
using Jackson. Note that
+ * this serializer doesn't check for the size of the resulting binary.
  */
 public class ZNRecordJacksonSerializer implements ZkSerializer {
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index ea8c164..8b80f2d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import org.apache.helix.HelixManager;
 import org.apache.helix.model.ResourceAssignment;
 
 import java.util.HashMap;
@@ -32,6 +33,11 @@ public class MockAssignmentMetadataStore extends 
AssignmentMetadataStore {
   private Map<String, ResourceAssignment> _persistGlobalBaseline = new 
HashMap<>();
   private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new 
HashMap<>();
 
+  public MockAssignmentMetadataStore() {
+    // In-memory mock component, so pass null for HelixManager since it's not 
needed
+    super(null);
+  }
+
   public Map<String, ResourceAssignment> getBaseline() {
     return _persistGlobalBaseline;
   }
@@ -44,7 +50,8 @@ public class MockAssignmentMetadataStore extends 
AssignmentMetadataStore {
     return _persistBestPossibleAssignment;
   }
 
-  public void persistBestPossibleAssignment(Map<String, ResourceAssignment> 
bestPossibleAssignment) {
+  public void persistBestPossibleAssignment(
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
     _persistBestPossibleAssignment = bestPossibleAssignment;
   }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
new file mode 100644
index 0000000..922915f
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -0,0 +1,101 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ResourceAssignment;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAssignmentMetadataStore extends ZkTestBase {
+  protected static final int NODE_NR = 5;
+  protected static final int START_PORT = 12918;
+  protected static final String STATE_MODEL = "MasterSlave";
+  protected static final String TEST_DB = "TestDB";
+  protected static final int _PARTITIONS = 20;
+
+  protected HelixManager _manager;
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  protected MockParticipantManager[] _participants = new 
MockParticipantManager[NODE_NR];
+  protected ClusterControllerManager _controller;
+  protected int _replica = 3;
+
+  private AssignmentMetadataStore _store;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, 
STATE_MODEL);
+    for (int i = 0; i < NODE_NR; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
+
+    // start dummy participants
+    for (int i = 0; i < NODE_NR; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
instanceName);
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    // create AssignmentMetadataStore
+    _store = new AssignmentMetadataStore(_manager);
+  }
+
+  /**
+   * TODO: Reading baseline will be empty because AssignmentMetadataStore 
isn't being used yet by
+   * the new rebalancer. Modify this integration test once the WAGED rebalancer
+   * starts using AssignmentMetadataStore's persist APIs.
+   * TODO: WAGED Rebalancer currently does NOT work with ZKClusterVerifier 
because verifier's
+   * HelixManager is null, and that causes an NPE when instantiating 
AssignmentMetadataStore.
+   */
+  @Test
+  public void testReadEmptyBaseline() {
+    try {
+      Map<String, ResourceAssignment> baseline = _store.getBaseline();
+      Assert.fail("Should fail because there shouldn't be any data.");
+    } catch (Exception e) {
+      // OK
+    }
+  }
+}

Reply via email to