This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch nealsun/waged-pipeline-redesign
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to 
refs/heads/nealsun/waged-pipeline-redesign by this push:
     new 0c3e9b08d WAGED tests and metrics (#2302)
0c3e9b08d is described below

commit 0c3e9b08dfd4f9d6407484c00adfa4c5d7a5ef5b
Author: Neal Sun <[email protected]>
AuthorDate: Fri Dec 2 10:20:23 2022 -0800

    WAGED tests and metrics (#2302)
    
    Add WAGED tests and metrics.
---
 .../rebalancer/waged/WagedRebalancer.java          |  31 ++++-
 .../metrics/WagedRebalancerMetricCollector.java    |  20 ++-
 .../rebalancer/waged/TestWagedRebalancer.java      | 141 +++++++++++++++++++++
 3 files changed, 188 insertions(+), 4 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index e7c09ab65..0c1b09395 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -112,6 +112,10 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
   private final LatencyMetric _writeLatency;
   private final CountMetric _partialRebalanceCounter;
   private final LatencyMetric _partialRebalanceLatency;
+  private final CountMetric _emergencyRebalanceCounter;
+  private final LatencyMetric _emergencyRebalanceLatency;
+  private final CountMetric _rebalanceOverwriteCounter;
+  private final LatencyMetric _rebalanceOverwriteLatency;
   private final LatencyMetric _stateReadLatency;
   private final BaselineDivergenceGauge _baselineDivergenceGauge;
 
@@ -209,6 +213,16 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
         
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
             .name(),
         LatencyMetric.class);
+    _emergencyRebalanceCounter = _metricCollector.getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(),
 CountMetric.class);
+    _emergencyRebalanceLatency = _metricCollector.getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+        LatencyMetric.class);
+    _rebalanceOverwriteCounter = _metricCollector.getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(),
 CountMetric.class);
+    _rebalanceOverwriteLatency = _metricCollector.getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+        LatencyMetric.class);
     _writeLatency = _metricCollector.getMetric(
         
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
         LatencyMetric.class);
@@ -617,11 +631,15 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     }
   }
 
-  private Map<String, ResourceAssignment> emergencyRebalance(
+  protected Map<String, ResourceAssignment> emergencyRebalance(
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
       RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    LOG.info("Start emergency rebalance.");
+    _emergencyRebalanceCounter.increment(1L);
+    _emergencyRebalanceLatency.startMeasuringLatency();
+
     Map<String, ResourceAssignment> currentBestPossibleAssignment =
         getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
             resourceMap.keySet());
@@ -643,6 +661,7 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     // Step 2: if there are permanent node downs, calculate for a new one best 
possible
     Map<String, ResourceAssignment> newAssignment;
     if (!allNodesActive.get()) {
+      LOG.info("Emergency rebalance responding to permanent node down.");
       ClusterModel clusterModel;
       try {
         clusterModel =
@@ -659,6 +678,7 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
 
     // Step 3: persist result to metadata store
     persistBestPossibleAssignment(newAssignment);
+    _emergencyRebalanceLatency.endMeasuringLatency();
     LOG.info("Finish emergency rebalance");
 
     partialRebalance(clusterData, resourceMap, activeNodes, 
currentStateOutput, algorithm);
@@ -833,7 +853,7 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     }
   }
 
-  private boolean requireRebalanceOverwrite(ResourceControllerDataProvider 
clusterData,
+  protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider 
clusterData,
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     AtomicBoolean allMinActiveReplicaMet = new AtomicBoolean(true);
     
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> 
{
@@ -872,10 +892,13 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
    * @param baseline the baseline assignment.
    * @param algorithm the rebalance algorithm.
    */
-  private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
+  protected void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    _rebalanceOverwriteCounter.increment(1L);
+    _rebalanceOverwriteLatency.startMeasuringLatency();
+
     ClusterModel clusterModel;
     try {
       // Note this calculation uses the baseline as the best possible 
assignment input here.
@@ -912,6 +935,8 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
               Math.min(minActiveReplica, numReplica));
 
       newIdealState.setPreferenceLists(finalPreferenceLists);
+
+      _rebalanceOverwriteLatency.endMeasuringLatency();
     }
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index dcd9a0857..94a99c95a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -43,6 +43,8 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
     // Per-stage latency metrics
     GlobalBaselineCalcLatencyGauge,
     PartialRebalanceLatencyGauge,
+    EmergencyRebalanceLatencyGauge,
+    RebalanceOverwriteLatencyGauge,
 
     // The following latency metrics are related to AssignmentMetadataStore
     StateReadLatencyGauge,
@@ -61,7 +63,9 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
 
     // Waged rebalance counters.
     GlobalBaselineCalcCounter,
-    PartialRebalanceCounter
+    PartialRebalanceCounter,
+    EmergencyRebalanceCounter,
+    RebalanceOverwriteCounter
   }
 
   public WagedRebalancerMetricCollector(String clusterName) {
@@ -97,6 +101,12 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
     LatencyMetric partialRebalanceLatencyGauge =
         new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
             getResetIntervalInMs());
+    LatencyMetric emergencyRebalanceLatencyGauge =
+        new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+            getResetIntervalInMs());
+    LatencyMetric rebalanceOverwriteLatencyGauge =
+        new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+            getResetIntervalInMs());
     LatencyMetric stateReadLatencyGauge =
         new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
             getResetIntervalInMs());
@@ -111,15 +121,23 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
         new 
RebalanceCounter(WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name());
     CountMetric partialRebalanceCounter =
         new 
RebalanceCounter(WagedRebalancerMetricNames.PartialRebalanceCounter.name());
+    CountMetric emergencyRebalanceCounter =
+        new 
RebalanceCounter(WagedRebalancerMetricNames.EmergencyRebalanceCounter.name());
+    CountMetric rebalanceOverwriteCounter =
+        new 
RebalanceCounter(WagedRebalancerMetricNames.RebalanceOverwriteCounter.name());
 
     // Add metrics to WagedRebalancerMetricCollector
     addMetric(globalBaselineCalcLatencyGauge);
     addMetric(partialRebalanceLatencyGauge);
+    addMetric(emergencyRebalanceLatencyGauge);
+    addMetric(rebalanceOverwriteLatencyGauge);
     addMetric(stateReadLatencyGauge);
     addMetric(stateWriteLatencyGauge);
     addMetric(baselineDivergenceGauge);
     addMetric(calcFailureCount);
     addMetric(globalBaselineCalcCounter);
     addMetric(partialRebalanceCounter);
+    addMetric(emergencyRebalanceCounter);
+    addMetric(rebalanceOverwriteCounter);
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 5d4047d5b..d011c5c2e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,9 +20,12 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.io.IOException;
+import java.sql.Array;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -32,8 +35,10 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import 
org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
 import 
org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -50,6 +55,7 @@ import 
org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -575,6 +581,141 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
     Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
   }
 
+  @Test(dependsOnMethods = "testRebalance")
+  public void testEmergencyRebalance() throws IOException, 
HelixRebalanceException {
+    _metadataStore.reset();
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new 
MockRebalanceAlgorithm());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, 
spyAlgorithm, Optional.empty());
+
+    // Cluster config change will trigger baseline to be recalculated.
+    when(clusterData.getRefreshedChangeTypes())
+        
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, Resource> resourceMap =
+        
clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+    // Populate best possible assignment
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    // Global Rebalance once, Partial Rebalance once
+    verify(spyAlgorithm, times(2)).calculate(any());
+
+    // Artificially insert an offline node in the best possible assignment
+    Map<String, ResourceAssignment> bestPossibleAssignment =
+        _metadataStore.getBestPossibleAssignment();
+    String offlineResource = _resourceNames.get(0);
+    String offlinePartition = _partitionNames.get(0);
+    String offlineState = "MASTER";
+    String offlineInstance = "offlineInstance";
+    for (Partition partition : 
bestPossibleAssignment.get(offlineResource).getMappedPartitions()) {
+      if (partition.getPartitionName().equals(offlinePartition)) {
+        bestPossibleAssignment.get(offlineResource)
+            .addReplicaMap(partition, 
Collections.singletonMap(offlineInstance, offlineState));
+      }
+    }
+    _metadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+
+    // This should trigger both emergency rebalance and partial rebalance
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    ArgumentCaptor<ClusterModel> capturedClusterModel = 
ArgumentCaptor.forClass(ClusterModel.class);
+    // 2 from previous case, Emergency + Partial from this case, 4 in total
+    verify(spyAlgorithm, times(4)).calculate(capturedClusterModel.capture());
+    // In the cluster model for Emergency rebalance, the assignableReplica is 
the offline one
+    ClusterModel clusterModelForEmergencyRebalance = 
capturedClusterModel.getAllValues().get(2);
+    
Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().size(),
 1);
+    
Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).size(),
 1);
+    AssignableReplica assignableReplica =
+        
clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).iterator().next();
+    Assert.assertEquals(assignableReplica.getPartitionName(), 
offlinePartition);
+    Assert.assertEquals(assignableReplica.getReplicaState(), offlineState);
+
+    bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
+    for (Map.Entry<String, ResourceAssignment> entry : 
bestPossibleAssignment.entrySet()) {
+      ResourceAssignment resourceAssignment = entry.getValue();
+      for (Partition partition : resourceAssignment.getMappedPartitions()) {
+        for (String instance: 
resourceAssignment.getReplicaMap(partition).keySet()) {
+          Assert.assertNotSame(instance, offlineInstance);
+        }
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testRebalanceOverwriteTrigger() throws IOException, 
HelixRebalanceException {
+    _metadataStore.reset();
+
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    // Enable delay rebalance
+    ClusterConfig clusterConfig = clusterData.getClusterConfig();
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(1);
+    clusterData.setClusterConfig(clusterConfig);
+
+    // force create a fake offlineInstance that's in delay window
+    Set<String> instances = new HashSet<>(_instances);
+    String offlineInstance = "offlineInstance";
+    instances.add(offlineInstance);
+    when(clusterData.getAllInstances()).thenReturn(instances);
+    Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
+    instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + 
Integer.MAX_VALUE);
+    
when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
+    Map<String, InstanceConfig> instanceConfigMap = 
clusterData.getInstanceConfigMap();
+    instanceConfigMap.put(offlineInstance, 
createMockInstanceConfig(offlineInstance));
+    when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    // Set minActiveReplica to 0 so that requireRebalanceOverwrite returns 
false
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState idealState = clusterData.getIdealState(resource);
+      idealState.setMinActiveReplicas(0);
+      isMap.put(resource, idealState);
+    }
+    when(clusterData.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> 
isMap.get(invocationOnMock.getArguments()[0]));
+    when(clusterData.getIdealStates()).thenReturn(isMap);
+
+    MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new 
MockRebalanceAlgorithm());
+    WagedRebalancer rebalancer = Mockito.spy(new 
WagedRebalancer(_metadataStore, spyAlgorithm, Optional.empty()));
+
+    // Cluster config change will trigger baseline to be recalculated.
+    when(clusterData.getRefreshedChangeTypes())
+        
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, Resource> resourceMap =
+        
clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+    // Populate best possible assignment
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    verify(rebalancer, times(1)).requireRebalanceOverwrite(any(), any());
+    verify(rebalancer, times(0)).applyRebalanceOverwrite(any(), any(), any(), 
any(), any());
+
+    // Set minActiveReplica to 1 so that requireRebalanceOverwrite returns true
+    for (String resource : _resourceNames) {
+      IdealState idealState = clusterData.getIdealState(resource);
+      idealState.setMinActiveReplicas(3);
+      isMap.put(resource, idealState);
+    }
+    when(clusterData.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> 
isMap.get(invocationOnMock.getArguments()[0]));
+    when(clusterData.getIdealStates()).thenReturn(isMap);
+
+    _metadataStore.reset();
+    // Update the config so the cluster config will be marked as changed.
+    clusterConfig = clusterData.getClusterConfig();
+    Map<String, Integer> defaultCapacityMap =
+        new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
+    defaultCapacityMap.put("foobar", 0);
+    clusterConfig.setDefaultInstanceCapacityMap(defaultCapacityMap);
+    clusterData.setClusterConfig(clusterConfig);
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    verify(rebalancer, times(2)).requireRebalanceOverwrite(any(), any());
+    verify(rebalancer, times(1)).applyRebalanceOverwrite(any(), any(), any(), 
any(), any());
+  }
+
   @Test(dependsOnMethods = "testRebalance")
   public void testReset() throws IOException, HelixRebalanceException {
     _metadataStore.reset();

Reply via email to