This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit 80d340ef1b398fdcd0436f418884982ff3b7e878 Author: Hunter Lee <[email protected]> AuthorDate: Tue Sep 10 09:42:23 2019 -0700 Fix TestWagedRebalancer and add constructor in AssignmentMetadataStore TestWagedRebalancer was failing because it was not using a proper HelixManager to instantiate a mock version of AssignmentMetadataStore. This diff refactors the constructors in AssignmentMetadataStore and fixes the failing test. --- .../rebalancer/waged/AssignmentMetadataStore.java | 18 +++-- .../waged/MockAssignmentMetadataStore.java | 10 +-- .../rebalancer/waged/TestWagedRebalancer.java | 93 +++++++++++++--------- 3 files changed, 71 insertions(+), 50 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java index bf9f292..fd655d1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java @@ -21,6 +21,8 @@ package org.apache.helix.controller.rebalancer.waged; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.BucketDataAccessor; import org.apache.helix.HelixException; @@ -31,9 +33,6 @@ import org.apache.helix.manager.zk.ZNRecordJacksonSerializer; import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.model.ResourceAssignment; -import java.util.HashMap; -import java.util.Map; - /** * A placeholder before we have the real assignment metadata store. */ @@ -51,12 +50,15 @@ public class AssignmentMetadataStore { private Map<String, ResourceAssignment> _globalBaseline; private Map<String, ResourceAssignment> _bestPossibleAssignment; + AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) { + _dataAccessor = bucketDataAccessor; + _baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY); + _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY); + } + AssignmentMetadataStore(HelixManager helixManager) { - _dataAccessor = new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()); - _baselinePath = - String.format(BASELINE_TEMPLATE, helixManager.getClusterName(), ASSIGNMENT_METADATA_KEY); - _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, helixManager.getClusterName(), - ASSIGNMENT_METADATA_KEY); + this(new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()), + helixManager.getClusterName()); } public Map<String, ResourceAssignment> getBaseline() { diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java index 8b80f2d..3371c8b 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java @@ -19,11 +19,10 @@ package org.apache.helix.controller.rebalancer.waged; * under the License. */ -import org.apache.helix.HelixManager; -import org.apache.helix.model.ResourceAssignment; - import java.util.HashMap; import java.util.Map; +import org.apache.helix.BucketDataAccessor; +import org.apache.helix.model.ResourceAssignment; /** * A mock up metadata store for unit test. @@ -33,9 +32,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore { private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>(); private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>(); - public MockAssignmentMetadataStore() { - // In-memory mock component, so pass null for HelixManager since it's not needed - super(null); + MockAssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) { + super(bucketDataAccessor, clusterName); } public Map<String, ResourceAssignment> getBaseline() { diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index 6759a10..d6fd99b 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -1,5 +1,24 @@ package org.apache.helix.controller.rebalancer.waged; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -7,7 +26,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - +import org.apache.helix.BucketDataAccessor; import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -28,13 +47,13 @@ import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.when; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; public class TestWagedRebalancer extends AbstractTestClusterModel { private Set<String> _instances; private MockRebalanceAlgorithm _algorithm; + private MockAssignmentMetadataStore _metadataStore; @BeforeClass public void initialize() { @@ -42,6 +61,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { _instances = new HashSet<>(); _instances.add(_testInstanceId); _algorithm = new MockRebalanceAlgorithm(); + + // Initialize a mock assignment metadata store + BucketDataAccessor mockAccessor = Mockito.mock(BucketDataAccessor.class); + String clusterName = ""; // an empty string for testing purposes + _metadataStore = new MockAssignmentMetadataStore(mockAccessor, clusterName); } @Override @@ -88,9 +112,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { @Test public void testRebalance() throws IOException, HelixRebalanceException { - // Init mock metadatastore for the unit test - MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + _metadataStore.clearMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); // Generate the input for the rebalancer. ResourceControllerDataProvider clusterData = setupClusterDataCache(); @@ -111,9 +134,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { @Test(dependsOnMethods = "testRebalance") public void testPartialRebalance() throws IOException, HelixRebalanceException { - // Init mock metadatastore for the unit test - MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + _metadataStore.clearMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); // Generate the input for the rebalancer. ResourceControllerDataProvider clusterData = setupClusterDataCache(); @@ -127,7 +149,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // Test with partial resources listed in the resourceMap input. // Remove the first resource from the input. Note it still exists in the cluster data cache. - metadataStore.clearMetadataStore(); + _metadataStore.clearMetadataStore(); resourceMap.remove(_resourceNames.get(0)); Map<String, IdealState> newIdealStates = rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); @@ -137,9 +159,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { @Test(dependsOnMethods = "testRebalance") public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException { - // Init mock metadatastore for the unit test - MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + _metadataStore.clearMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); // Generate the input for the rebalancer. ResourceControllerDataProvider clusterData = setupClusterDataCache(); @@ -160,9 +181,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { String resourceName = csEntry.getKey(); CurrentState cs = csEntry.getValue(); for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) { - currentStateOutput - .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()), - instanceName, partitionStateEntry.getValue()); + currentStateOutput.setCurrentState(resourceName, + new Partition(partitionStateEntry.getKey()), instanceName, + partitionStateEntry.getValue()); } } } @@ -197,7 +218,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { @Test(dependsOnMethods = "testRebalance") public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException { - WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm); + _metadataStore.clearMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); ResourceControllerDataProvider clusterData = setupClusterDataCache(); String nonCompatibleResourceName = _resourceNames.get(0); @@ -222,7 +244,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // TODO test with invalid capacity configuration which will fail the cluster model constructing. @Test(dependsOnMethods = "testRebalance") public void testInvalidClusterStatus() throws IOException { - WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm); + _metadataStore.clearMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); ResourceControllerDataProvider clusterData = setupClusterDataCache(); String invalidResource = _resourceNames.get(0); @@ -270,8 +293,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", HelixRebalanceException.Type.FAILED_TO_CALCULATE)); - WagedRebalancer rebalancer = - new WagedRebalancer(new MockAssignmentMetadataStore(), badAlgorithm); + _metadataStore.clearMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm); ResourceControllerDataProvider clusterData = setupClusterDataCache(); Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect( @@ -294,9 +317,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm // won't propagate any existing assignment from the cluster model. - // Init mock metadatastore for the unit test - MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + _metadataStore.clearMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); // 1. rebalance with baseline calculation done // Generate the input for the rebalancer. @@ -317,10 +339,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // Since there is no special condition, the calculated IdealStates should be exactly the same // as the mock algorithm result. validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); - Map<String, ResourceAssignment> baseline = metadataStore.getBaseline(); + Map<String, ResourceAssignment> baseline = _metadataStore.getBaseline(); Assert.assertEquals(baseline, algorithmResult); Map<String, ResourceAssignment> bestPossibleAssignment = - metadataStore.getBestPossibleAssignment(); + _metadataStore.getBestPossibleAssignment(); Assert.assertEquals(bestPossibleAssignment, algorithmResult); // 2. rebalance with one ideal state changed only @@ -344,14 +366,14 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { Collections.singletonMap(changedResourceName, new Resource(changedResourceName)), newIdealStates, partialAlgorithmResult); // Baseline should be empty, because there is no cluster topology change. - baseline = metadataStore.getBaseline(); + baseline = _metadataStore.getBaseline(); Assert.assertEquals(baseline, Collections.emptyMap()); // Best possible assignment contains the new assignment of only one resource. - bestPossibleAssignment = metadataStore.getBestPossibleAssignment(); + bestPossibleAssignment = _metadataStore.getBestPossibleAssignment(); Assert.assertEquals(bestPossibleAssignment, partialAlgorithmResult); // * Before the next test, recover the best possible assignment record. - metadataStore.persistBestPossibleAssignment(algorithmResult); + _metadataStore.persistBestPossibleAssignment(algorithmResult); // 3. rebalance with current state change only // Create a new cluster data cache to simulate cluster change @@ -373,9 +395,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // Verify that only the changed resource has been included in the calculation. validateRebalanceResult(Collections.emptyMap(), newIdealStates, algorithmResult); // Both assignment state should be empty. - baseline = metadataStore.getBaseline(); + baseline = _metadataStore.getBaseline(); Assert.assertEquals(baseline, Collections.emptyMap()); - bestPossibleAssignment = metadataStore.getBestPossibleAssignment(); + bestPossibleAssignment = _metadataStore.getBestPossibleAssignment(); Assert.assertEquals(bestPossibleAssignment, Collections.emptyMap()); // 4. rebalance with no change but best possible state record missing. @@ -389,10 +411,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // Verify that both resource has been included in the calculation. validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); // Both assignment state should be empty since no cluster topology change. - baseline = metadataStore.getBaseline(); + baseline = _metadataStore.getBaseline(); Assert.assertEquals(baseline, Collections.emptyMap()); // The best possible assignment should be present. - bestPossibleAssignment = metadataStore.getBestPossibleAssignment(); + bestPossibleAssignment = _metadataStore.getBestPossibleAssignment(); Assert.assertEquals(bestPossibleAssignment, algorithmResult); } @@ -403,9 +425,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { Assert.assertTrue(newIdealStates.containsKey(resourceName)); IdealState is = newIdealStates.get(resourceName); ResourceAssignment assignment = expectedResult.get(resourceName); - Assert.assertEquals(is.getPartitionSet(), new HashSet<>( - assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName()) - .collect(Collectors.toSet()))); + Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions() + .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet()))); for (String partitionName : is.getPartitionSet()) { Assert.assertEquals(is.getInstanceStateMap(partitionName), assignment.getReplicaMap(new Partition(partitionName)));
