This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9701d4006 WAGED - bugs fixes  in n - n + 1 (#2578)
9701d4006 is described below

commit 9701d40064e8ded9c1632627c1a52f2390218fab
Author: Komal Desai <[email protected]>
AuthorDate: Thu Jul 27 16:22:47 2023 -0700

    WAGED - bugs fixes  in n - n + 1 (#2578)
    
    Fix the bug in WAGED n-n+1 feature
---
 .../rebalancer/waged/WagedInstanceCapacity.java    |  88 +++----
 .../WagedRebalancer/TestWagedClusterExpansion.java |  37 ++-
 .../WagedRebalancer/TestWagedLoadedCluster.java    | 258 +++++++++++++++++++++
 3 files changed, 331 insertions(+), 52 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
index 1b878dbb3..d9bdaf879 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixDefinedState;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
@@ -76,67 +75,54 @@ public class WagedInstanceCapacity implements 
InstanceCapacityDataProvider {
   public void process(ResourceControllerDataProvider cache, CurrentStateOutput 
currentStateOutput,
       Map<String, Resource> resourceMap, WagedResourceWeightsProvider 
weightProvider) {
     processCurrentState(cache, currentStateOutput, resourceMap, 
weightProvider);
-    processPendingMessages(cache, currentStateOutput, resourceMap, 
weightProvider);
+    processPendingMessages(cache, resourceMap, weightProvider);
   }
 
-  /**
-   * Process the pending messages based on the Current states
-   * @param currentState - Current state of the resources.
-   */
   public void processPendingMessages(ResourceControllerDataProvider cache,
-      CurrentStateOutput currentState, Map<String, Resource> resourceMap,
-      WagedResourceWeightsProvider weightProvider) {
-
-    for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
-      String resName = resourceEntry.getKey();
-      Resource resource = resourceEntry.getValue();
-
-      // if Resource is WAGED managed, then we need to manage the capacity.
-      if (!WagedValidationUtil.isWagedEnabled(cache.getIdealState(resName))) {
-        continue;
-      }
-
-      // list of partitions in the resource
-      Collection<Partition> partitions = resource.getPartitions();
-      // State model definition for the resource
-      StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
-      if (stateModelDef == null) {
-        LOG.warn("State Model Definition for resource: " + resName + " is 
null");
+      Map<String, Resource> resourceMap, WagedResourceWeightsProvider 
weightProvider) {
+    // Get list of live instances from the cache.
+    Set<String> liveInstances = cache.getEnabledLiveInstances();
+    // Get all the messages from the cache
+    Map<String, Collection<Message>> messages = 
cache.getAllInstancesMessages();
+
+    for (String instance : liveInstances) {
+      Collection<Message> msgs = messages.get(instance);
+      if (msgs == null || msgs.isEmpty()) {
         continue;
       }
-      Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
-
-      for (Partition partition : partitions) {
-        String partitionName = partition.getPartitionName();
-        // Get Partition Weight
-        Map<String, Integer> partCapacity = 
weightProvider.getPartitionWeights(resName, partitionName);
-        if (partCapacity == null || partCapacity.isEmpty()) {
-          LOG.info("Partition: " + partitionName + " in resource: " + resName
-              + " has no weight specified. Skipping it.");
-          continue;
-        }
-        // Get the pending messages for the partition
-        Map<String, Message> pendingMessages = 
currentState.getPendingMessageMap(resName, partition);
-        if (pendingMessages != null && !pendingMessages.isEmpty()) {
-          for (Map.Entry<String, Message> entry :  pendingMessages.entrySet()) 
{
-            String instance = entry.getKey();
-            if (hasPartitionChargedForCapacity(instance, resName, 
partitionName)) {
-              continue;
-            }
-            Message msg = entry.getValue();
-            // If boot-strapping message is pending, we should deduct the 
capacity.
-            if (statePriorityMap.get(msg.getFromState()) < 
statePriorityMap.get(msg.getToState())
-                && msg.getFromState().equals(stateModelDef.getInitialState())) 
{
-              LOG.info("For bootstrappiing - deducting capacity for instance: 
" + instance
-                  + " for resource: " + resName + " for partition: " + 
partitionName);
-              checkAndReduceInstanceCapacity(instance, resName, partitionName, 
partCapacity);
-            }
+      for (Message msg : msgs) {
+        if 
(msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
+          String resName = msg.getResourceName();
+          // if Resource is not in resourceMap or is not WAGED, then we can 
skip.
+          if (!resourceMap.containsKey(resName) ||
+              
!WagedValidationUtil.isWagedEnabled(cache.getIdealState(resName))) {
+            continue;
+          }
+          Resource resource = resourceMap.get(resName);
+          StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
+          Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
+
+          String partitionName = msg.getPartitionName();
+          // Get Partition Weight
+          Map<String, Integer> partCapacity = 
weightProvider.getPartitionWeights(resName, partitionName);
+          if (partCapacity == null || partCapacity.isEmpty()) {
+            LOG.info("Partition: " + partitionName + " in resource: " + resName
+                + " has no weight specified. Skipping it.");
+            continue;
+          }
+          // If bootstrapping message is pending, we should deduct the 
capacity.
+          if (statePriorityMap.get(msg.getFromState()) > 
statePriorityMap.get(msg.getToState())
+              && msg.getFromState().equals(stateModelDef.getInitialState())) {
+            LOG.info("For bootstrapping - deducting capacity for instance: " + 
instance
+                + " for resource: " + resName + " for partition: " + 
partitionName);
+            checkAndReduceInstanceCapacity(instance, resName, partitionName, 
partCapacity);
           }
         }
       }
     }
   }
 
+
   private void processCurrentState(ResourceControllerDataProvider cache,
       CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap,
       WagedResourceWeightsProvider weightProvider) {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
index 4778d9b1e..fb8ba7ddc 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
@@ -40,6 +40,7 @@ import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
@@ -84,6 +85,8 @@ public class TestWagedClusterExpansion extends ZkTestBase {
   private final int INSTANCE_CAPACITY = 100;
   private final int DEFAULT_PARTITION_CAPACITY = 6;
   private final int INCREASED_PARTITION_CAPACITY = 10;
+
+  private final int REDUCED_INSTANCE_CAPACITY = 98;
   private final int DEFAULT_DELAY = 500; // 0.5 second
   private final String  _testCapacityKey = "TestCapacityKey";
   private final String  _resourceChanged = "Test-WagedDB-0";
@@ -106,6 +109,13 @@ public class TestWagedClusterExpansion extends ZkTestBase {
 
     @Transition(to = "SLAVE", from = "OFFLINE")
     public void onBecomeSlaveFromOffline(Message message, NotificationContext 
context) {
+      if (_delay > 0) {
+        try {
+          Thread.currentThread().sleep(_delay);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
       LOG.info("Become SLAVE from OFFLINE");
     }
 
@@ -145,7 +155,7 @@ public class TestWagedClusterExpansion extends ZkTestBase {
     }
   }
 
-  public class WagedDelayMSStateModelFactory extends 
StateModelFactory<WagedMasterSlaveModel> {
+  public static class WagedDelayMSStateModelFactory extends 
StateModelFactory<WagedMasterSlaveModel> {
     private long _delay;
 
     @Override
@@ -282,6 +292,31 @@ public class TestWagedClusterExpansion extends ZkTestBase {
     waitForPipeline(100, 3000); // this is for ZK to sync up.
   }
 
+  // This test case reduces the capacity of one of the instance.
+  @Test (dependsOnMethods = "testIncreaseResourcePartitionWeight")
+  public void testReduceInstanceCapacity() throws Exception {
+
+    // Reduce capacity for one of the instance
+    String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
NUM_NODE);
+
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
+    String db = _resourceChanged;
+    InstanceConfig config = 
dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(storageNodeName));
+    if (config == null) {
+      config = new InstanceConfig(storageNodeName);
+    }
+    Map<String, Integer> capacityDataMap = ImmutableMap.of(_testCapacityKey, 
REDUCED_INSTANCE_CAPACITY);
+    config.setInstanceCapacityMap(capacityDataMap);
+    
dataAccessor.setProperty(dataAccessor.keyBuilder().instanceConfig(storageNodeName),
 config);
+
+    // Make sure pipeline is run.
+    waitForPipeline(100, 10000); // 10 sec. max timeout.
+
+    LOG.info("After changing instance capacity");
+    validateIdealState(true);
+    waitForPipeline(100, 3000); // this is for ZK to sync up.
+  }
+
   @AfterClass
   public void afterClass() throws Exception {
     try {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedLoadedCluster.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedLoadedCluster.java
new file mode 100644
index 000000000..9bd981fc2
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedLoadedCluster.java
@@ -0,0 +1,258 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional warnrmation
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This test case is specially targeting for n - n+1 issue.
+ *
+ * Initially, all the partitions are equal size and equal weight
+ * from both CU, DISK.
+ * All the nodes are equally loaded.
+ * The test case will do the following:
+ *     For one resource, we will double its CU weight.
+ *     The rebalancer will be triggered.
+ *
+ * We have a monitoring thread which is constantly monitoring the instance 
capacity.
+ * - It looks at current state resource assignment + pending messages
+ * - it  has ASSERT in place to make sure we NEVER cross instance capacity (CU)
+ */
+public class TestWagedLoadedCluster extends ZkTestBase {
+  protected final int NUM_NODE = 6;
+  protected static final int START_PORT = 13000;
+  protected static final int PARTITIONS = 10;
+
+  protected static final String CLASS_NAME = 
TestWagedLoadedCluster.class.getSimpleName();
+  protected static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + 
CLASS_NAME;
+  protected ClusterControllerManager _controller;
+  protected AssignmentMetadataStore _assignmentMetadataStore;
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  List<String> _nodes = new ArrayList<>();
+  private final Set<String> _allDBs = new HashSet<>();
+
+  private CountDownLatch _completedTest = new CountDownLatch(1);
+  private CountDownLatch _weightUpdatedLatch = new CountDownLatch(1); // when 
0, weight is updated
+
+  private Thread _verifyThread = null;
+  private final Map<String, Integer> _defaultInstanceCapacity =
+      ImmutableMap.of("CU", 50, "DISK", 50);
+
+  private final Map<String, Integer> _defaultPartitionWeight =
+      ImmutableMap.of("CU", 10, "DISK", 10);
+
+  private final Map<String, Integer> _newPartitionWeight =
+      ImmutableMap.of("CU", 20, "DISK", 10);
+  private final int DEFAULT_DELAY = 500; // 0.5 second
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    LOG.info("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    // create 6 node cluster
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _nodes.add(storageNodeName);
+    }
+    // ST downward message will get delayed by 5sec.
+    startParticipants(DEFAULT_DELAY);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    _assignmentMetadataStore =
+        new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), 
CLUSTER_NAME) {
+          public Map<String, ResourceAssignment> getBaseline() {
+            // Ensure this metadata store always read from the ZK without 
using cache.
+            super.reset();
+            return super.getBaseline();
+          }
+
+          public synchronized Map<String, ResourceAssignment> 
getBestPossibleAssignment() {
+            // Ensure this metadata store always read from the ZK without 
using cache.
+            super.reset();
+            return super.getBestPossibleAssignment();
+          }
+        };
+
+    // Set test instance capacity and partition weights
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
+    ClusterConfig clusterConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+
+    clusterConfig.setDefaultInstanceCapacityMap(_defaultInstanceCapacity);
+    clusterConfig.setDefaultPartitionWeightMap(_defaultPartitionWeight);
+    dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), 
clusterConfig);
+
+    // Create 3 resources with 2 partitions each.
+    for (int i = 0; i < 3; i++) {
+      String db = "Test-WagedDB-" + i;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, 
BuiltInStateModelDefinitions.MasterSlave.name(),
+          2 /*numPartitions*/, 3 /*replicas*/, 3 /*minActiveReplicas*/);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, 3);
+      _allDBs.add(db);
+    }
+
+    // Start a thread which will keep validating instance usage using 
currentState and pending messages.
+    _verifyThread = new Thread(() -> {
+      while (_completedTest.getCount() > 0) {
+        try {
+          validateInstanceUsage();
+          Thread.currentThread().sleep(100);
+        } catch (InterruptedException e) {
+          LOG.debug("Exception in validateInstanceUsageThread", e);
+        } catch (Exception e) {
+          LOG.error("Exception in validateInstanceUsageThread", e);
+        }
+
+      }
+    });
+  }
+
+  public boolean validateInstanceUsage() {
+    try {
+      HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
+      PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
+      // For each instance, get the currentState map and pending messages.
+      for (MockParticipantManager participant : _participants) {
+        String instance = participant.getInstanceName();
+        int totalCUUsage = 0;
+        List<String> resourceNames = dataAccessor.
+            getChildNames(propertyKeyBuilder.currentStates(instance, 
participant.getSessionId()));
+        for (String resourceName : resourceNames) {
+          PropertyKey currentStateKey = 
propertyKeyBuilder.currentState(instance,
+              participant.getSessionId(), resourceName);
+          CurrentState currentState = 
dataAccessor.getProperty(currentStateKey);
+          if (currentState != null && 
currentState.getPartitionStateMap().size() > 0) {
+            if (_weightUpdatedLatch.getCount() == 0 && 
resourceName.equals("Test-WagedDB-0")) {
+              // For Test-WagedDB-0, the partition weight is updated to 20 CU.
+              totalCUUsage += currentState.getPartitionStateMap().size() * 20;
+            } else {
+              totalCUUsage += currentState.getPartitionStateMap().size() * 10;
+            }
+          }
+        }
+        List<Message> messages = 
dataAccessor.getChildValues(dataAccessor.keyBuilder().messages(instance), 
false);
+        for (Message m : messages) {
+          if (m.getFromState().equals("OFFLINE") && 
m.getToState().equals("SLAVE")) {
+            totalCUUsage += 10;
+          }
+        }
+        assert(totalCUUsage <= 50);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception in validateInstanceUsage", e);
+      return false;
+    }
+    return true;
+  }
+
+  private void startParticipants(int delay) {
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, node);
+      StateMachineEngine stateMach = participant.getStateMachineEngine();
+      TestWagedClusterExpansion.WagedDelayMSStateModelFactory delayFactory =
+          new 
TestWagedClusterExpansion.WagedDelayMSStateModelFactory().setDelay(delay);
+      stateMach.registerStateModelFactory("MasterSlave", delayFactory);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+  }
+
+  @Test
+  public void testUpdateInstanceCapacity() throws Exception {
+
+    // Check modified time for external view of the first resource.
+    // if pipeline is run, then external view would be persisted.
+    _verifyThread.start();
+    // Update the weight for one of the resource.
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
+    String db = "Test-WagedDB-0";
+    ResourceConfig resourceConfig = 
dataAccessor.getProperty(dataAccessor.keyBuilder().resourceConfig(db));
+    if (resourceConfig == null) {
+      resourceConfig = new ResourceConfig(db);
+    }
+    resourceConfig.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, 
_newPartitionWeight));
+    dataAccessor.setProperty(dataAccessor.keyBuilder().resourceConfig(db), 
resourceConfig);
+    Thread.currentThread().sleep(100);
+    _weightUpdatedLatch.countDown();
+    Thread.currentThread().sleep(3000);
+    _completedTest.countDown();
+    Thread.currentThread().sleep(100);
+  }
+
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    try {
+      if (_controller != null && _controller.isConnected()) {
+        _controller.syncStop();
+      }
+      for (MockParticipantManager p : _participants) {
+        if (p != null && p.isConnected()) {
+          p.syncStop();
+        }
+      }
+      deleteCluster(CLUSTER_NAME);
+      //_verifyThread.interrupt();
+    } catch (Exception e) {
+      LOG.info("After class throwing exception, {}", e);
+    }
+  }
+}

Reply via email to