This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 9701d4006 WAGED - bugs fixes in n - n + 1 (#2578)
9701d4006 is described below
commit 9701d40064e8ded9c1632627c1a52f2390218fab
Author: Komal Desai <[email protected]>
AuthorDate: Thu Jul 27 16:22:47 2023 -0700
WAGED - bugs fixes in n - n + 1 (#2578)
Fix the bug in WAGED n-n+1 feature
---
.../rebalancer/waged/WagedInstanceCapacity.java | 88 +++----
.../WagedRebalancer/TestWagedClusterExpansion.java | 37 ++-
.../WagedRebalancer/TestWagedLoadedCluster.java | 258 +++++++++++++++++++++
3 files changed, 331 insertions(+), 52 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
index 1b878dbb3..d9bdaf879 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.HelixDefinedState;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
@@ -76,67 +75,54 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
public void process(ResourceControllerDataProvider cache, CurrentStateOutput
currentStateOutput,
Map<String, Resource> resourceMap, WagedResourceWeightsProvider
weightProvider) {
processCurrentState(cache, currentStateOutput, resourceMap,
weightProvider);
- processPendingMessages(cache, currentStateOutput, resourceMap,
weightProvider);
+ processPendingMessages(cache, resourceMap, weightProvider);
}
- /**
- * Process the pending messages based on the Current states
- * @param currentState - Current state of the resources.
- */
public void processPendingMessages(ResourceControllerDataProvider cache,
- CurrentStateOutput currentState, Map<String, Resource> resourceMap,
- WagedResourceWeightsProvider weightProvider) {
-
- for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
- String resName = resourceEntry.getKey();
- Resource resource = resourceEntry.getValue();
-
- // if Resource is WAGED managed, then we need to manage the capacity.
- if (!WagedValidationUtil.isWagedEnabled(cache.getIdealState(resName))) {
- continue;
- }
-
- // list of partitions in the resource
- Collection<Partition> partitions = resource.getPartitions();
- // State model definition for the resource
- StateModelDefinition stateModelDef =
cache.getStateModelDef(resource.getStateModelDefRef());
- if (stateModelDef == null) {
- LOG.warn("State Model Definition for resource: " + resName + " is
null");
+ Map<String, Resource> resourceMap, WagedResourceWeightsProvider
weightProvider) {
+ // Get list of live instances from the cache.
+ Set<String> liveInstances = cache.getEnabledLiveInstances();
+ // Get all the messages from the cache
+ Map<String, Collection<Message>> messages =
cache.getAllInstancesMessages();
+
+ for (String instance : liveInstances) {
+ Collection<Message> msgs = messages.get(instance);
+ if (msgs == null || msgs.isEmpty()) {
continue;
}
- Map<String, Integer> statePriorityMap =
stateModelDef.getStatePriorityMap();
-
- for (Partition partition : partitions) {
- String partitionName = partition.getPartitionName();
- // Get Partition Weight
- Map<String, Integer> partCapacity =
weightProvider.getPartitionWeights(resName, partitionName);
- if (partCapacity == null || partCapacity.isEmpty()) {
- LOG.info("Partition: " + partitionName + " in resource: " + resName
- + " has no weight specified. Skipping it.");
- continue;
- }
- // Get the pending messages for the partition
- Map<String, Message> pendingMessages =
currentState.getPendingMessageMap(resName, partition);
- if (pendingMessages != null && !pendingMessages.isEmpty()) {
- for (Map.Entry<String, Message> entry : pendingMessages.entrySet())
{
- String instance = entry.getKey();
- if (hasPartitionChargedForCapacity(instance, resName,
partitionName)) {
- continue;
- }
- Message msg = entry.getValue();
- // If boot-strapping message is pending, we should deduct the
capacity.
- if (statePriorityMap.get(msg.getFromState()) <
statePriorityMap.get(msg.getToState())
- && msg.getFromState().equals(stateModelDef.getInitialState()))
{
- LOG.info("For bootstrappiing - deducting capacity for instance:
" + instance
- + " for resource: " + resName + " for partition: " +
partitionName);
- checkAndReduceInstanceCapacity(instance, resName, partitionName,
partCapacity);
- }
+ for (Message msg : msgs) {
+ if
(msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
+ String resName = msg.getResourceName();
+ // if Resource is not in resourceMap or is not WAGED, then we can
skip.
+ if (!resourceMap.containsKey(resName) ||
+
!WagedValidationUtil.isWagedEnabled(cache.getIdealState(resName))) {
+ continue;
+ }
+ Resource resource = resourceMap.get(resName);
+ StateModelDefinition stateModelDef =
cache.getStateModelDef(resource.getStateModelDefRef());
+ Map<String, Integer> statePriorityMap =
stateModelDef.getStatePriorityMap();
+
+ String partitionName = msg.getPartitionName();
+ // Get Partition Weight
+ Map<String, Integer> partCapacity =
weightProvider.getPartitionWeights(resName, partitionName);
+ if (partCapacity == null || partCapacity.isEmpty()) {
+ LOG.info("Partition: " + partitionName + " in resource: " + resName
+ + " has no weight specified. Skipping it.");
+ continue;
+ }
+ // If bootstrapping message is pending, we should deduct the
capacity.
+ if (statePriorityMap.get(msg.getFromState()) >
statePriorityMap.get(msg.getToState())
+ && msg.getFromState().equals(stateModelDef.getInitialState())) {
+ LOG.info("For bootstrapping - deducting capacity for instance: " +
instance
+ + " for resource: " + resName + " for partition: " +
partitionName);
+ checkAndReduceInstanceCapacity(instance, resName, partitionName,
partCapacity);
}
}
}
}
}
+
private void processCurrentState(ResourceControllerDataProvider cache,
CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap,
WagedResourceWeightsProvider weightProvider) {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
index 4778d9b1e..fb8ba7ddc 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
@@ -40,6 +40,7 @@ import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
@@ -84,6 +85,8 @@ public class TestWagedClusterExpansion extends ZkTestBase {
private final int INSTANCE_CAPACITY = 100;
private final int DEFAULT_PARTITION_CAPACITY = 6;
private final int INCREASED_PARTITION_CAPACITY = 10;
+
+ private final int REDUCED_INSTANCE_CAPACITY = 98;
private final int DEFAULT_DELAY = 500; // 0.5 second
private final String _testCapacityKey = "TestCapacityKey";
private final String _resourceChanged = "Test-WagedDB-0";
@@ -106,6 +109,13 @@ public class TestWagedClusterExpansion extends ZkTestBase {
@Transition(to = "SLAVE", from = "OFFLINE")
public void onBecomeSlaveFromOffline(Message message, NotificationContext
context) {
+ if (_delay > 0) {
+ try {
+ Thread.currentThread().sleep(_delay);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
LOG.info("Become SLAVE from OFFLINE");
}
@@ -145,7 +155,7 @@ public class TestWagedClusterExpansion extends ZkTestBase {
}
}
- public class WagedDelayMSStateModelFactory extends
StateModelFactory<WagedMasterSlaveModel> {
+ public static class WagedDelayMSStateModelFactory extends
StateModelFactory<WagedMasterSlaveModel> {
private long _delay;
@Override
@@ -282,6 +292,31 @@ public class TestWagedClusterExpansion extends ZkTestBase {
waitForPipeline(100, 3000); // this is for ZK to sync up.
}
+ // This test case reduces the capacity of one of the instance.
+ @Test (dependsOnMethods = "testIncreaseResourcePartitionWeight")
+ public void testReduceInstanceCapacity() throws Exception {
+
+ // Reduce capacity for one of the instance
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT +
NUM_NODE);
+
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME,
_baseAccessor);
+ String db = _resourceChanged;
+ InstanceConfig config =
dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(storageNodeName));
+ if (config == null) {
+ config = new InstanceConfig(storageNodeName);
+ }
+ Map<String, Integer> capacityDataMap = ImmutableMap.of(_testCapacityKey,
REDUCED_INSTANCE_CAPACITY);
+ config.setInstanceCapacityMap(capacityDataMap);
+
dataAccessor.setProperty(dataAccessor.keyBuilder().instanceConfig(storageNodeName),
config);
+
+ // Make sure pipeline is run.
+ waitForPipeline(100, 10000); // 10 sec. max timeout.
+
+ LOG.info("After changing instance capacity");
+ validateIdealState(true);
+ waitForPipeline(100, 3000); // this is for ZK to sync up.
+ }
+
@AfterClass
public void afterClass() throws Exception {
try {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedLoadedCluster.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedLoadedCluster.java
new file mode 100644
index 000000000..9bd981fc2
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedLoadedCluster.java
@@ -0,0 +1,258 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional warnrmation
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This test case is specially targeting for n - n+1 issue.
+ *
+ * Initially, all the partitions are equal size and equal weight
+ * from both CU, DISK.
+ * All the nodes are equally loaded.
+ * The test case will do the following:
+ * For one resource, we will double its CU weight.
+ * The rebalancer will be triggered.
+ *
+ * We have a monitoring thread which is constantly monitoring the instance
capacity.
+ * - It looks at current state resource assignment + pending messages
+ * - it has ASSERT in place to make sure we NEVER cross instance capacity (CU)
+ */
+public class TestWagedLoadedCluster extends ZkTestBase {
+ protected final int NUM_NODE = 6;
+ protected static final int START_PORT = 13000;
+ protected static final int PARTITIONS = 10;
+
+ protected static final String CLASS_NAME =
TestWagedLoadedCluster.class.getSimpleName();
+ protected static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" +
CLASS_NAME;
+ protected ClusterControllerManager _controller;
+ protected AssignmentMetadataStore _assignmentMetadataStore;
+ List<MockParticipantManager> _participants = new ArrayList<>();
+ List<String> _nodes = new ArrayList<>();
+ private final Set<String> _allDBs = new HashSet<>();
+
+ private CountDownLatch _completedTest = new CountDownLatch(1);
+ private CountDownLatch _weightUpdatedLatch = new CountDownLatch(1); // when
0, weight is updated
+
+ private Thread _verifyThread = null;
+ private final Map<String, Integer> _defaultInstanceCapacity =
+ ImmutableMap.of("CU", 50, "DISK", 50);
+
+ private final Map<String, Integer> _defaultPartitionWeight =
+ ImmutableMap.of("CU", 10, "DISK", 10);
+
+ private final Map<String, Integer> _newPartitionWeight =
+ ImmutableMap.of("CU", 20, "DISK", 10);
+ private final int DEFAULT_DELAY = 500; // 0.5 second
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ LOG.info("START " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ // create 6 node cluster
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ _nodes.add(storageNodeName);
+ }
+ // ST downward message will get delayed by 5sec.
+ startParticipants(DEFAULT_DELAY);
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+ _assignmentMetadataStore =
+ new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR),
CLUSTER_NAME) {
+ public Map<String, ResourceAssignment> getBaseline() {
+ // Ensure this metadata store always read from the ZK without
using cache.
+ super.reset();
+ return super.getBaseline();
+ }
+
+ public synchronized Map<String, ResourceAssignment>
getBestPossibleAssignment() {
+ // Ensure this metadata store always read from the ZK without
using cache.
+ super.reset();
+ return super.getBestPossibleAssignment();
+ }
+ };
+
+ // Set test instance capacity and partition weights
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME,
_baseAccessor);
+ ClusterConfig clusterConfig =
+ dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+
+ clusterConfig.setDefaultInstanceCapacityMap(_defaultInstanceCapacity);
+ clusterConfig.setDefaultPartitionWeightMap(_defaultPartitionWeight);
+ dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(),
clusterConfig);
+
+ // Create 3 resources with 2 partitions each.
+ for (int i = 0; i < 3; i++) {
+ String db = "Test-WagedDB-" + i;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db,
BuiltInStateModelDefinitions.MasterSlave.name(),
+ 2 /*numPartitions*/, 3 /*replicas*/, 3 /*minActiveReplicas*/);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, 3);
+ _allDBs.add(db);
+ }
+
+ // Start a thread which will keep validating instance usage using
currentState and pending messages.
+ _verifyThread = new Thread(() -> {
+ while (_completedTest.getCount() > 0) {
+ try {
+ validateInstanceUsage();
+ Thread.currentThread().sleep(100);
+ } catch (InterruptedException e) {
+ LOG.debug("Exception in validateInstanceUsageThread", e);
+ } catch (Exception e) {
+ LOG.error("Exception in validateInstanceUsageThread", e);
+ }
+
+ }
+ });
+ }
+
+ public boolean validateInstanceUsage() {
+ try {
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME,
_baseAccessor);
+ PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
+ // For each instance, get the currentState map and pending messages.
+ for (MockParticipantManager participant : _participants) {
+ String instance = participant.getInstanceName();
+ int totalCUUsage = 0;
+ List<String> resourceNames = dataAccessor.
+ getChildNames(propertyKeyBuilder.currentStates(instance,
participant.getSessionId()));
+ for (String resourceName : resourceNames) {
+ PropertyKey currentStateKey =
propertyKeyBuilder.currentState(instance,
+ participant.getSessionId(), resourceName);
+ CurrentState currentState =
dataAccessor.getProperty(currentStateKey);
+ if (currentState != null &&
currentState.getPartitionStateMap().size() > 0) {
+ if (_weightUpdatedLatch.getCount() == 0 &&
resourceName.equals("Test-WagedDB-0")) {
+ // For Test-WagedDB-0, the partition weight is updated to 20 CU.
+ totalCUUsage += currentState.getPartitionStateMap().size() * 20;
+ } else {
+ totalCUUsage += currentState.getPartitionStateMap().size() * 10;
+ }
+ }
+ }
+ List<Message> messages =
dataAccessor.getChildValues(dataAccessor.keyBuilder().messages(instance),
false);
+ for (Message m : messages) {
+ if (m.getFromState().equals("OFFLINE") &&
m.getToState().equals("SLAVE")) {
+ totalCUUsage += 10;
+ }
+ }
+ assert(totalCUUsage <= 50);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in validateInstanceUsage", e);
+ return false;
+ }
+ return true;
+ }
+
+ private void startParticipants(int delay) {
+ // start dummy participants
+ for (String node : _nodes) {
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME, node);
+ StateMachineEngine stateMach = participant.getStateMachineEngine();
+ TestWagedClusterExpansion.WagedDelayMSStateModelFactory delayFactory =
+ new
TestWagedClusterExpansion.WagedDelayMSStateModelFactory().setDelay(delay);
+ stateMach.registerStateModelFactory("MasterSlave", delayFactory);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+ }
+
+ @Test
+ public void testUpdateInstanceCapacity() throws Exception {
+
+ // Check modified time for external view of the first resource.
+ // if pipeline is run, then external view would be persisted.
+ _verifyThread.start();
+ // Update the weight for one of the resource.
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME,
_baseAccessor);
+ String db = "Test-WagedDB-0";
+ ResourceConfig resourceConfig =
dataAccessor.getProperty(dataAccessor.keyBuilder().resourceConfig(db));
+ if (resourceConfig == null) {
+ resourceConfig = new ResourceConfig(db);
+ }
+ resourceConfig.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY,
_newPartitionWeight));
+ dataAccessor.setProperty(dataAccessor.keyBuilder().resourceConfig(db),
resourceConfig);
+ Thread.currentThread().sleep(100);
+ _weightUpdatedLatch.countDown();
+ Thread.currentThread().sleep(3000);
+ _completedTest.countDown();
+ Thread.currentThread().sleep(100);
+ }
+
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ try {
+ if (_controller != null && _controller.isConnected()) {
+ _controller.syncStop();
+ }
+ for (MockParticipantManager p : _participants) {
+ if (p != null && p.isConnected()) {
+ p.syncStop();
+ }
+ }
+ deleteCluster(CLUSTER_NAME);
+ //_verifyThread.interrupt();
+ } catch (Exception e) {
+ LOG.info("After class throwing exception, {}", e);
+ }
+ }
+}