This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer2 in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3d2da32bddc7798a502ae170873f0fb1abe8cbd5 Author: Hunter Lee <[email protected]> AuthorDate: Mon Sep 9 16:40:24 2019 -0700 Implement AssignmentMetadataStore (#453) Implement AssignmentMetadataStore AssignmentMetadataStore is a component for the new WAGED Rebalaner. It provides APIs that allows the rebalancer to read and write the baseline and best possible assignments using BucketDataAccessor. Changelist: 1. Add AssignmentMetadataStore 2. Add an integration test: TestAssignmentMetadataStore --- .../rebalancer/waged/AssignmentMetadataStore.java | 112 +++++++++++++++++++-- .../rebalancer/waged/WagedRebalancer.java | 64 ++++++------ .../manager/zk/ZNRecordJacksonSerializer.java | 4 +- .../waged/MockAssignmentMetadataStore.java | 9 +- .../waged/TestAssignmentMetadataStore.java | 101 +++++++++++++++++++ 5 files changed, 244 insertions(+), 46 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java index cc52dac..bf9f292 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java @@ -19,6 +19,16 @@ package org.apache.helix.controller.rebalancer.waged; * under the License. */ +import java.io.IOException; +import java.util.Arrays; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.helix.BucketDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixProperty; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZNRecordJacksonSerializer; +import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.model.ResourceAssignment; import java.util.HashMap; @@ -28,24 +38,106 @@ import java.util.Map; * A placeholder before we have the real assignment metadata store. */ public class AssignmentMetadataStore { - private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>(); - private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>(); + private static final String ASSIGNMENT_METADATA_KEY = "ASSIGNMENT_METADATA"; + private static final String BASELINE_TEMPLATE = "/%s/%s/BASELINE"; + private static final String BEST_POSSIBLE_TEMPLATE = "/%s/%s/BEST_POSSIBLE"; + private static final String BASELINE_KEY = "BASELINE"; + private static final String BEST_POSSIBLE_KEY = "BEST_POSSIBLE"; + private static final ZkSerializer SERIALIZER = new ZNRecordJacksonSerializer(); + + private BucketDataAccessor _dataAccessor; + private String _baselinePath; + private String _bestPossiblePath; + private Map<String, ResourceAssignment> _globalBaseline; + private Map<String, ResourceAssignment> _bestPossibleAssignment; + + AssignmentMetadataStore(HelixManager helixManager) { + _dataAccessor = new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()); + _baselinePath = + String.format(BASELINE_TEMPLATE, helixManager.getClusterName(), ASSIGNMENT_METADATA_KEY); + _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, helixManager.getClusterName(), + ASSIGNMENT_METADATA_KEY); + } public Map<String, ResourceAssignment> getBaseline() { - return _persistGlobalBaseline; + // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK + if (_globalBaseline == null) { + HelixProperty baseline = + _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class); + _globalBaseline = splitAssignments(baseline); + } + return _globalBaseline; + } + + public Map<String, ResourceAssignment> getBestPossibleAssignment() { + // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK + if (_bestPossibleAssignment == null) { + HelixProperty baseline = + _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class); + _bestPossibleAssignment = splitAssignments(baseline); + } + return _bestPossibleAssignment; } public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) { - // TODO clean up invalid items - _persistGlobalBaseline = globalBaseline; + // TODO: Make the write async? + // Persist to ZK + HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline); + try { + _dataAccessor.compressedBucketWrite(_baselinePath, combinedAssignments); + } catch (IOException e) { + // TODO: Improve failure handling + throw new HelixException("Failed to persist baseline!", e); + } + + // Update the in-memory reference + _globalBaseline = globalBaseline; } - public Map<String, ResourceAssignment> getBestPossibleAssignment() { - return _persistBestPossibleAssignment; + public void persistBestPossibleAssignment( + Map<String, ResourceAssignment> bestPossibleAssignment) { + // TODO: Make the write async? + // Persist to ZK asynchronously + HelixProperty combinedAssignments = + combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment); + try { + _dataAccessor.compressedBucketWrite(_bestPossiblePath, combinedAssignments); + } catch (IOException e) { + // TODO: Improve failure handling + throw new HelixException("Failed to persist baseline!", e); + } + + // Update the in-memory reference + _bestPossibleAssignment = bestPossibleAssignment; + } + + /** + * Produces one HelixProperty that contains all assignment data. + * @param name + * @param assignmentMap + * @return + */ + private HelixProperty combineAssignments(String name, + Map<String, ResourceAssignment> assignmentMap) { + HelixProperty property = new HelixProperty(name); + // Add each resource's assignment as a simple field in one ZNRecord + assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource, + Arrays.toString(SERIALIZER.serialize(assignment.getRecord())))); + return property; } - public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) { - // TODO clean up invalid items - _persistBestPossibleAssignment.putAll(bestPossibleAssignment); + /** + * Returns a Map of (ResourceName, ResourceAssignment) pairs. + * @param property + * @return + */ + private Map<String, ResourceAssignment> splitAssignments(HelixProperty property) { + Map<String, ResourceAssignment> assignmentMap = new HashMap<>(); + // Convert each resource's assignment String into a ResourceAssignment object and put it in a + // map + property.getRecord().getSimpleFields() + .forEach((resource, assignment) -> assignmentMap.put(resource, + new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes())))); + return assignmentMap; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 866c7c9..22cac7e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -51,10 +51,10 @@ import org.slf4j.LoggerFactory; /** * Weight-Aware Globally-Even Distribute Rebalancer. - * - * @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer"> - * Design Document - * </a> + * @see <a + * href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer"> + * Design Document + * </a> */ public class WagedRebalancer { private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class); @@ -62,8 +62,8 @@ public class WagedRebalancer { // When any of the following change happens, the rebalancer needs to do a global rebalance which // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - Collections.unmodifiableSet(new HashSet<>(Arrays - .asList(HelixConstants.ChangeType.RESOURCE_CONFIG, + Collections + .unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG))); // The cluster change detector is a stateful object. @@ -73,7 +73,8 @@ public class WagedRebalancer { private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator; // --------- The following fields are placeholders and need replacement. -----------// - // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization? + // TODO Shall we make the metadata store a static threadlocal object as well to avoid + // reinitialization? private final AssignmentMetadataStore _assignmentMetadataStore; private final RebalanceAlgorithm _rebalanceAlgorithm; // ------------------------------------------------------------------------------------// @@ -81,8 +82,8 @@ public class WagedRebalancer { public WagedRebalancer(HelixManager helixManager) { this( // TODO init the metadata store according to their requirement when integrate, - // or change to final static method if possible. - new AssignmentMetadataStore(), + // or change to final static method if possible. + new AssignmentMetadataStore(helixManager), // TODO parse the cluster setting ConstraintBasedAlgorithmFactory.getInstance(), // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output. @@ -103,14 +104,14 @@ public class WagedRebalancer { protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, RebalanceAlgorithm algorithm) { this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer()); + } /** * Compute the new IdealStates for all the input resources. The IdealStates include both new * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields). - * - * @param clusterData The Cluster status data provider. - * @param resourceMap A map containing all the rebalancing resources. + * @param clusterData The Cluster status data provider. + * @param resourceMap A map containing all the rebalancing resources. * @param currentStateOutput The present Current States of the resources. * @return A map of the new IdealStates with the resource name as key. */ @@ -124,8 +125,8 @@ public class WagedRebalancer { IdealState is = clusterData.getIdealState(resourceEntry.getKey()); return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) && getClass().getName().equals(is.getRebalancerClassName()); - }).collect(Collectors - .toMap(resourceEntry -> resourceEntry.getKey(), resourceEntry -> resourceEntry.getValue())); + }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(), + resourceEntry -> resourceEntry.getValue())); if (resourceMap.isEmpty()) { LOG.warn("There is no valid resource to be rebalanced by {}", @@ -140,13 +141,13 @@ public class WagedRebalancer { Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap); // Construct the new best possible states according to the current state and target assignment. - // Note that the new ideal state might be an intermediate state between the current state and the target assignment. + // Note that the new ideal state might be an intermediate state between the current state and + // the target assignment. for (IdealState is : newIdealStates.values()) { String resourceName = is.getResourceName(); // Adjust the states according to the current state. - ResourceAssignment finalAssignment = _mappingCalculator - .computeBestPossiblePartitionState(clusterData, is, resourceMap.get(resourceName), - currentStateOutput); + ResourceAssignment finalAssignment = _mappingCalculator.computeBestPossiblePartitionState( + clusterData, is, resourceMap.get(resourceName), currentStateOutput); // Clean up the state mapping fields. Use the final assignment that is calculated by the // mapping calculator to replace them. @@ -195,10 +196,10 @@ public class WagedRebalancer { IdealState newIdeaState; try { IdealState currentIdealState = clusterData.getIdealState(resourceName); - Map<String, Integer> statePriorityMap = - clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()) - .getStatePriorityMap(); - // Create a new IdealState instance contains the new calculated assignment in the preference list. + Map<String, Integer> statePriorityMap = clusterData + .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap(); + // Create a new IdealState instance contains the new calculated assignment in the preference + // list. newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState, newAssignment.get(resourceName), statePriorityMap); } catch (Exception ex) { @@ -227,9 +228,8 @@ public class WagedRebalancer { throw new HelixRebalanceException("Failed to get the current baseline assignment.", HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); } - Map<String, ResourceAssignment> baseline = - calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(), - Collections.emptyMap(), currentBaseline); + Map<String, ResourceAssignment> baseline = calculateAssignment(clusterData, clusterChanges, + resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline); try { _assignmentMetadataStore.persistBaseline(baseline); } catch (Exception ex) { @@ -254,9 +254,8 @@ public class WagedRebalancer { throw new HelixRebalanceException("Failed to get the persisted assignment records.", HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); } - Map<String, ResourceAssignment> newAssignment = - calculateAssignment(clusterData, clusterChanges, resourceMap, activeInstances, baseline, - prevBestPossibleAssignment); + Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges, + resourceMap, activeInstances, baseline, prevBestPossibleAssignment); try { // TODO Test to confirm if persisting the final assignment (with final partition states) // would be a better option. @@ -271,13 +270,13 @@ public class WagedRebalancer { /** * Generate the cluster model based on the input and calculate the optimal assignment. - * * @param clusterData the cluster data cache. * @param clusterChanges the detected cluster changes. * @param resourceMap the rebalancing resources. * @param activeNodes the alive and enabled nodes. * @param baseline the baseline assignment for the algorithm as a reference. - * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a reference. + * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a + * reference. * @return the new optimal assignment for the resources. */ private Map<String, ResourceAssignment> calculateAssignment( @@ -289,9 +288,8 @@ public class WagedRebalancer { LOG.info("Start calculating for an assignment"); ClusterModel clusterModel; try { - clusterModel = ClusterModelProvider - .generateClusterModel(clusterData, resourceMap, activeNodes, clusterChanges, baseline, - prevBestPossibleAssignment); + clusterModel = ClusterModelProvider.generateClusterModel(clusterData, resourceMap, + activeNodes, clusterChanges, baseline, prevBestPossibleAssignment); } catch (Exception ex) { throw new HelixRebalanceException("Failed to generate cluster model.", HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java index 989017a..b375e80 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java @@ -27,8 +27,8 @@ import org.apache.helix.ZNRecord; import org.codehaus.jackson.map.ObjectMapper; /** - * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using MessagePack's - * serializer. Note that this serializer doesn't check for the size of the resulting binary. + * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using Jackson. Note that + * this serializer doesn't check for the size of the resulting binary. */ public class ZNRecordJacksonSerializer implements ZkSerializer { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java index ea8c164..8b80f2d 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java @@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged; * under the License. */ +import org.apache.helix.HelixManager; import org.apache.helix.model.ResourceAssignment; import java.util.HashMap; @@ -32,6 +33,11 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore { private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>(); private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>(); + public MockAssignmentMetadataStore() { + // In-memory mock component, so pass null for HelixManager since it's not needed + super(null); + } + public Map<String, ResourceAssignment> getBaseline() { return _persistGlobalBaseline; } @@ -44,7 +50,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore { return _persistBestPossibleAssignment; } - public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) { + public void persistBestPossibleAssignment( + Map<String, ResourceAssignment> bestPossibleAssignment) { _persistBestPossibleAssignment = bestPossibleAssignment; } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java new file mode 100644 index 0000000..922915f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java @@ -0,0 +1,101 @@ +package org.apache.helix.controller.rebalancer.waged; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ResourceAssignment; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestAssignmentMetadataStore extends ZkTestBase { + protected static final int NODE_NR = 5; + protected static final int START_PORT = 12918; + protected static final String STATE_MODEL = "MasterSlave"; + protected static final String TEST_DB = "TestDB"; + protected static final int _PARTITIONS = 20; + + protected HelixManager _manager; + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR]; + protected ClusterControllerManager _controller; + protected int _replica = 3; + + private AssignmentMetadataStore _store; + + @BeforeClass + public void beforeClass() throws Exception { + super.beforeClass(); + + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL); + for (int i = 0; i < NODE_NR; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica); + + // start dummy participants + for (int i = 0; i < NODE_NR; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].syncStart(); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // create cluster manager + _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", + InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + + // create AssignmentMetadataStore + _store = new AssignmentMetadataStore(_manager); + } + + /** + * TODO: Reading baseline will be empty because AssignmentMetadataStore isn't being used yet by + * the new rebalancer. Modify this integration test once the WAGED rebalancer + * starts using AssignmentMetadataStore's persist APIs. + * TODO: WAGED Rebalancer currently does NOT work with ZKClusterVerifier because verifier's + * HelixManager is null, and that causes an NPE when instantiating AssignmentMetadataStore. + */ + @Test + public void testReadEmptyBaseline() { + try { + Map<String, ResourceAssignment> baseline = _store.getBaseline(); + Assert.fail("Should fail because there shouldn't be any data."); + } catch (Exception e) { + // OK + } + } +}
