This is an automated email from the ASF dual-hosted git repository.
nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 08bc488e2 Waged Pipeline Redesign (#2319)
08bc488e2 is described below
commit 08bc488e2ab79644b1850a79d76b975603f63c56
Author: Neal Sun <[email protected]>
AuthorDate: Thu Dec 22 09:48:58 2022 -0800
Waged Pipeline Redesign (#2319)
Introduces Emergency Rebalance and other performance optimization
---
.../rebalancer/waged/AssignmentMetadataStore.java | 116 ++++----
.../rebalancer/waged/ReadOnlyWagedRebalancer.java | 28 +-
.../rebalancer/waged/WagedRebalancer.java | 316 ++++++++++++++++-----
.../waged/model/ClusterModelProvider.java | 70 ++++-
.../java/org/apache/helix/model/ClusterConfig.java | 1 +
.../metrics/WagedRebalancerMetricCollector.java | 20 +-
.../waged/MockAssignmentMetadataStore.java | 19 +-
.../waged/TestAssignmentMetadataStore.java | 24 --
.../rebalancer/waged/TestWagedRebalancer.java | 155 +++++++++-
9 files changed, 582 insertions(+), 167 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 834fbad4e..0a532944a 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -51,6 +51,7 @@ public class AssignmentMetadataStore {
// volatile for double-checked locking
protected volatile Map<String, ResourceAssignment> _globalBaseline;
protected volatile Map<String, ResourceAssignment> _bestPossibleAssignment;
+ protected volatile int _bestPossibleVersion = 0;
AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
@@ -99,60 +100,78 @@ public class AssignmentMetadataStore {
}
/**
- * @return true if a new baseline was persisted.
+ * @param newAssignment
+ * @param path the path of the assignment record
+ * @param key the key of the assignment in the record
* @throws HelixException if the method failed to persist the baseline.
*/
- public synchronized boolean persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
- return persistAssignment(globalBaseline, getBaseline(), _baselinePath,
BASELINE_KEY);
+ private void persistAssignmentToMetadataStore(Map<String,
ResourceAssignment> newAssignment, String path, String key)
+ throws HelixException {
+ // TODO: Make the write async?
+ // Persist to ZK
+ HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
+ try {
+ _dataAccessor.compressedBucketWrite(path, combinedAssignments);
+ } catch (IOException e) {
+ throw new HelixException(String.format("Failed to persist %s assignment
to path %s", key, path), e);
+ }
}
/**
- * @return true if a new best possible assignment was persisted.
- * @throws HelixException if the method failed to persist the baseline.
+ * Persist a new baseline assignment to metadata store first, then to memory
+ * @param globalBaseline
*/
- public synchronized boolean persistBestPossibleAssignment(
- Map<String, ResourceAssignment> bestPossibleAssignment) {
- return persistAssignment(bestPossibleAssignment,
getBestPossibleAssignment(), _bestPossiblePath,
- BEST_POSSIBLE_KEY);
+ public synchronized void persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
+ // write to metadata store
+ persistAssignmentToMetadataStore(globalBaseline, _baselinePath,
BASELINE_KEY);
+ // write to memory
+ getBaseline().clear();
+ getBaseline().putAll(globalBaseline);
}
- public synchronized void clearAssignmentMetadata() {
- persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath,
BASELINE_KEY);
- persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(),
_bestPossiblePath,
- BEST_POSSIBLE_KEY);
+ /**
+ * Persist a new best possible assignment to metadata store first, then to
memory.
+ * Increment best possible version by 1 - this is a high priority in-memory
write.
+ * @param bestPossibleAssignment
+ */
+ public synchronized void persistBestPossibleAssignment(Map<String,
ResourceAssignment> bestPossibleAssignment) {
+ // write to metadata store
+ persistAssignmentToMetadataStore(bestPossibleAssignment,
_bestPossiblePath, BEST_POSSIBLE_KEY);
+ // write to memory
+ getBestPossibleAssignment().clear();
+ getBestPossibleAssignment().putAll(bestPossibleAssignment);
+ _bestPossibleVersion++;
}
/**
- * @param newAssignment
- * @param cachedAssignment
- * @param path the path of the assignment record
- * @param key the key of the assignment in the record
- * @return true if a new assignment was persisted.
+ * Attempts to persist Best Possible Assignment in memory from an
asynchronous thread.
+ * Persist only happens when the provided version is not stale - this is a
low priority in-memory write.
+ * @param bestPossibleAssignment - new assignment to be persisted
+ * @param newVersion - attempted new version to write. This version is
obtained earlier from getBestPossibleVersion()
+ * @return true if the attempt succeeded, false otherwise.
*/
- // TODO: Enhance the return value so it is more intuitive to understand when
the persist fails and
- // TODO: when it is skipped.
- private boolean persistAssignment(Map<String, ResourceAssignment>
newAssignment,
- Map<String, ResourceAssignment> cachedAssignment, String path,
- String key) {
- // TODO: Make the write async?
- // If the assignment hasn't changed, skip writing to metadata store
- if (compareAssignments(cachedAssignment, newAssignment)) {
- return false;
- }
- // Persist to ZK
- HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
- try {
- _dataAccessor.compressedBucketWrite(path, combinedAssignments);
- } catch (IOException e) {
- // TODO: Improve failure handling
- throw new HelixException(
- String.format("Failed to persist %s assignment to path %s", key,
path), e);
+ public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+ Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+ // Check if the version is stale by this point
+ if (newVersion > _bestPossibleVersion) {
+ getBestPossibleAssignment().clear();
+ getBestPossibleAssignment().putAll(bestPossibleAssignment);
+ _bestPossibleVersion = newVersion;
+ return true;
}
- // Update the in-memory reference
- cachedAssignment.clear();
- cachedAssignment.putAll(newAssignment);
- return true;
+ return false;
+ }
+
+ public int getBestPossibleVersion() {
+ return _bestPossibleVersion;
+ }
+
+ public synchronized void clearAssignmentMetadata() {
+ persistAssignmentToMetadataStore(Collections.emptyMap(), _baselinePath,
BASELINE_KEY);
+ persistAssignmentToMetadataStore(Collections.emptyMap(),
_bestPossiblePath, BEST_POSSIBLE_KEY);
+ getBaseline().clear();
+ getBestPossibleAssignment().clear();
}
protected synchronized void reset() {
@@ -207,16 +226,11 @@ public class AssignmentMetadataStore {
return assignmentMap;
}
- /**
- * Returns whether two assignments are same.
- * @param oldAssignment
- * @param newAssignment
- * @return true if they are the same. False otherwise or oldAssignment is
null
- */
- protected boolean compareAssignments(Map<String, ResourceAssignment>
oldAssignment,
- Map<String, ResourceAssignment> newAssignment) {
- // If oldAssignment is null, that means that we haven't read from/written
to
- // the metadata store yet. In that case, we return false so that we write
to metadata store.
- return oldAssignment != null && oldAssignment.equals(newAssignment);
+ protected boolean isBaselineChanged(Map<String, ResourceAssignment>
newBaseline) {
+ return !getBaseline().equals(newBaseline);
+ }
+
+ protected boolean isBestPossibleChanged(Map<String, ResourceAssignment>
newBestPossible) {
+ return !getBestPossibleAssignment().equals(newBestPossible);
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index e94148e99..e79c028ec 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -67,26 +67,30 @@ public class ReadOnlyWagedRebalancer extends
WagedRebalancer {
}
@Override
- public boolean persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
- // If baseline hasn't changed, skip updating the metadata store
- if (compareAssignments(_globalBaseline, globalBaseline)) {
- return false;
- }
+ public void persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
// Update the in-memory reference only
_globalBaseline = globalBaseline;
- return true;
}
@Override
- public boolean persistBestPossibleAssignment(
+ public void persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
- // If bestPossibleAssignment hasn't changed, skip updating the metadata
store
- if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment))
{
- return false;
- }
// Update the in-memory reference only
_bestPossibleAssignment = bestPossibleAssignment;
- return true;
+ _bestPossibleVersion++;
+ }
+
+ @Override
+ public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+ Map<String, ResourceAssignment> bestPossibleAssignment, int
newVersion) {
+ // Check if the version is stale by this point
+ if (newVersion > _bestPossibleVersion) {
+ _bestPossibleAssignment = bestPossibleAssignment;
+ _bestPossibleVersion = newVersion;
+ return true;
+ }
+
+ return false;
}
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index a99ad26ab..3fffef2fb 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
@@ -98,6 +99,7 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
// To calculate the baseline asynchronously
private final ExecutorService _baselineCalculateExecutor;
+ private final ExecutorService _bestPossibleCalculateExecutor;
private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider>
_mappingCalculator;
@@ -110,10 +112,16 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
private final LatencyMetric _writeLatency;
private final CountMetric _partialRebalanceCounter;
private final LatencyMetric _partialRebalanceLatency;
+ private final CountMetric _emergencyRebalanceCounter;
+ private final LatencyMetric _emergencyRebalanceLatency;
+ private final CountMetric _rebalanceOverwriteCounter;
+ private final LatencyMetric _rebalanceOverwriteLatency;
private final LatencyMetric _stateReadLatency;
private final BaselineDivergenceGauge _baselineDivergenceGauge;
private boolean _asyncGlobalRebalanceEnabled;
+ private boolean _asyncPartialRebalanceEnabled;
+ private Future<Boolean> _asyncPartialRebalanceResult;
// Note, the rebalance algorithm field is mutable so it should not be
directly referred except for
// the public method computeNewIdealStates.
@@ -149,7 +157,8 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
// cluster has converged.
helixManager == null ? new WagedRebalancerMetricCollector()
: new
WagedRebalancerMetricCollector(helixManager.getClusterName()),
- ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED);
+ ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED,
+ ClusterConfig.DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED);
_preference =
ImmutableMap.copyOf(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
}
@@ -166,12 +175,13 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
// If metricCollector is not provided, instantiate a version that does
not register metrics
// in order to allow rebalancer to proceed
metricCollectorOptional.orElse(new WagedRebalancerMetricCollector()),
- false);
+ false, false);
}
private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator,
HelixManager manager,
- MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled) {
+ MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled,
+ boolean isAsyncPartialRebalanceEnabled) {
if (assignmentMetadataStore == null) {
LOG.warn("Assignment Metadata Store is not configured properly."
+ " The rebalancer will not access the assignment store during the
rebalance.");
@@ -203,6 +213,16 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
.name(),
LatencyMetric.class);
+ _emergencyRebalanceCounter = _metricCollector.getMetric(
+
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(),
CountMetric.class);
+ _emergencyRebalanceLatency = _metricCollector.getMetric(
+
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+ LatencyMetric.class);
+ _rebalanceOverwriteCounter = _metricCollector.getMetric(
+
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(),
CountMetric.class);
+ _rebalanceOverwriteLatency = _metricCollector.getMetric(
+
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+ LatencyMetric.class);
_writeLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
LatencyMetric.class);
@@ -216,7 +236,9 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
_changeDetector = new ResourceChangeDetector(true);
_baselineCalculateExecutor = Executors.newSingleThreadExecutor();
+ _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
_asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+ _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
}
// Update the global rebalance mode to be asynchronous or synchronous
@@ -224,6 +246,11 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
_asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
}
+ // Update the partial rebalance mode to be asynchronous or synchronous
+ public void setPartialRebalanceAsyncMode(boolean
isAsyncPartialRebalanceEnabled) {
+ _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+ }
+
// Update the rebalancer preference if the new options are different from
the current preference.
public synchronized void updateRebalancePreference(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
@@ -249,6 +276,9 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
if (_baselineCalculateExecutor != null) {
_baselineCalculateExecutor.shutdownNow();
}
+ if (_bestPossibleCalculateExecutor != null) {
+ _bestPossibleCalculateExecutor.shutdownNow();
+ }
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
@@ -329,22 +359,24 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
// Schedule (or unschedule) delayed rebalance according to the delayed
rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
- Map<String, IdealState> newIdealStates =
convertResourceAssignment(clusterData,
- computeBestPossibleAssignment(clusterData, resourceMap, activeNodes,
currentStateOutput,
- algorithm));
+ Map<String, ResourceAssignment> newBestPossibleAssignment =
+ computeBestPossibleAssignment(clusterData, resourceMap, activeNodes,
currentStateOutput, algorithm);
+ Map<String, IdealState> newIdealStates =
convertResourceAssignment(clusterData, newBestPossibleAssignment);
// The additional rebalance overwrite is required since the calculated
mapping may contain
// some delayed rebalanced assignments.
- if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
+ if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) &&
requireRebalanceOverwrite(clusterData,
+ newBestPossibleAssignment)) {
applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet()),
- algorithm);
+ getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet()), algorithm);
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final
mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts
the long term
// assignment evenness and partition movements.
- newIdealStates.forEach((key, value) ->
applyUserDefinedPreferenceList(clusterData.getResourceConfig(key), value));
+ newIdealStates.forEach(
+ (resourceName, idealState) ->
applyUserDefinedPreferenceList(clusterData.getResourceConfig(resourceName),
+ idealState));
return newIdealStates;
}
@@ -357,9 +389,10 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
throws HelixRebalanceException {
// Perform global rebalance for a new baseline assignment
globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
- // Perform partial rebalance for a new best possible assignment
+ // Perform emergency rebalance for a new best possible assignment
Map<String, ResourceAssignment> newAssignment =
- partialRebalance(clusterData, resourceMap, activeNodes,
currentStateOutput, algorithm);
+ emergencyRebalance(clusterData, resourceMap, activeNodes,
currentStateOutput, algorithm);
+
return newAssignment;
}
@@ -410,44 +443,26 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
* @param algorithm
* @throws HelixRebalanceException
*/
- private void globalRebalance(ResourceControllerDataProvider clusterData,
- Map<String, Resource> resourceMap, final CurrentStateOutput
currentStateOutput,
- RebalanceAlgorithm algorithm)
- throws HelixRebalanceException {
+ private void globalRebalance(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap,
+ final CurrentStateOutput currentStateOutput, RebalanceAlgorithm
algorithm) throws HelixRebalanceException {
_changeDetector.updateSnapshots(clusterData);
// Get all the changed items' information. Filter for the items that have
content changed.
- final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
- _changeDetector.getAllChanges();
-
- if (clusterChanges.keySet().stream()
- .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
- // Build the cluster model for rebalance calculation.
- // Note, for a Baseline calculation,
- // 1. Ignore node status (disable/offline).
- // 2. Use the previous Baseline as the only parameter about the previous
assignment.
- Map<String, ResourceAssignment> currentBaseline =
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
- ClusterModel clusterModel;
- try {
- clusterModel = ClusterModelProvider
- .generateClusterModelForBaseline(clusterData, resourceMap,
- clusterData.getAllInstances(), clusterChanges,
currentBaseline);
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to generate cluster model
for global rebalance.",
- HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
- }
+ final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
_changeDetector.getAllChanges();
+ if
(clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains))
{
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
- final String clusterName = clusterData.getClusterName();
// Calculate the Baseline assignment for global rebalance.
Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
try {
- // Note that we should schedule a new partial rebalance for a future
rebalance pipeline if
- // the planned partial rebalance in the current rebalance pipeline
won't wait for the new
- // baseline being calculated.
- // So set shouldSchedulePartialRebalance to be
!waitForGlobalRebalance
- calculateAndUpdateBaseline(clusterModel, algorithm,
!waitForGlobalRebalance, clusterName);
+ // If the synchronous thread does not wait for the baseline to be
calculated, the synchronous thread should
+ // be triggered again after baseline is finished.
+ // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
+ doGlobalRebalance(clusterData, resourceMap, algorithm,
currentStateOutput, !waitForGlobalRebalance,
+ clusterChanges);
} catch (HelixRebalanceException e) {
+ if (_asyncGlobalRebalanceEnabled) {
+ _rebalanceFailureCount.increment(1L);
+ }
LOG.error("Failed to calculate baseline assignment!", e);
return false;
}
@@ -469,27 +484,40 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
/**
* Calculate and update the Baseline assignment
- * @param clusterModel
- * @param algorithm
- * @param shouldSchedulePartialRebalance True if the call should trigger a
following partial rebalance
+ * @param shouldTriggerMainPipeline True if the call should trigger a
following main pipeline rebalance
* so the new Baseline could be applied to
cluster.
- * @param clusterName
- * @throws HelixRebalanceException
*/
- private void calculateAndUpdateBaseline(ClusterModel clusterModel,
RebalanceAlgorithm algorithm,
- boolean shouldSchedulePartialRebalance, String clusterName)
- throws HelixRebalanceException {
+ private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap,
+ RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput,
boolean shouldTriggerMainPipeline,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws
HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
_baselineCalcCounter.increment(1L);
_baselineCalcLatency.startMeasuringLatency();
- boolean isBaselineChanged = false;
+ // Build the cluster model for rebalance calculation.
+ // Note, for a Baseline calculation,
+ // 1. Ignore node status (disable/offline).
+ // 2. Use the previous Baseline as the only parameter about the previous
assignment.
+ Map<String, ResourceAssignment> currentBaseline =
+ getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
+ ClusterModel clusterModel;
+ try {
+ clusterModel =
+ ClusterModelProvider.generateClusterModelForBaseline(clusterData,
resourceMap, clusterData.getAllInstances(),
+ clusterChanges, currentBaseline);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model for
global rebalance.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+
Map<String, ResourceAssignment> newBaseline =
calculateAssignment(clusterModel, algorithm);
+ boolean isBaselineChanged =
+ _assignmentMetadataStore != null &&
_assignmentMetadataStore.isBaselineChanged(newBaseline);
// Write the new baseline to metadata store
- if (_assignmentMetadataStore != null) {
+ if (isBaselineChanged) {
try {
_writeLatency.startMeasuringLatency();
- isBaselineChanged =
_assignmentMetadataStore.persistBaseline(newBaseline);
+ _assignmentMetadataStore.persistBaseline(newBaseline);
_writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new baseline
assignment.",
@@ -501,21 +529,67 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
_baselineCalcLatency.endMeasuringLatency();
LOG.info("Global baseline calculation completed and has been persisted
into metadata store.");
- if (isBaselineChanged && shouldSchedulePartialRebalance) {
+ if (isBaselineChanged && shouldTriggerMainPipeline) {
LOG.info("Schedule a new rebalance after the new baseline calculation
has finished.");
- RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
+ RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L,
false);
}
}
- private Map<String, ResourceAssignment> partialRebalance(
+ private void partialRebalance(
ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
+ // If partial rebalance is async and the previous result is not completed
yet,
+ // do not start another partial rebalance.
+ if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
+ && !_asyncPartialRebalanceResult.isDone()) {
+ return;
+ }
+
+ _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() ->
{
+ try {
+ doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
+ currentStateOutput);
+ } catch (HelixRebalanceException e) {
+ if (_asyncPartialRebalanceEnabled) {
+ _rebalanceFailureCount.increment(1L);
+ }
+ LOG.error("Failed to calculate best possible assignment!", e);
+ return false;
+ }
+ return true;
+ });
+ if (!_asyncPartialRebalanceEnabled) {
+ try {
+ if (!_asyncPartialRebalanceResult.get()) {
+ throw new HelixRebalanceException("Failed to calculate for the new
best possible.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new HelixRebalanceException("Failed to execute new best possible
calculation.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+ }
+ }
+ }
+
+ /**
+ * Calculate and update the Best Possible assignment
+ */
+ private void doPartialRebalance(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap,
+ Set<String> activeNodes, RebalanceAlgorithm algorithm,
CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
_partialRebalanceCounter.increment(1L);
_partialRebalanceLatency.startMeasuringLatency();
- // TODO: Consider combining the metrics for both baseline/best possible?
+
+ int newBestPossibleAssignmentVersion = -1;
+ if (_assignmentMetadataStore != null) {
+ newBestPossibleAssignmentVersion =
_assignmentMetadataStore.getBestPossibleVersion() + 1;
+ } else {
+ LOG.debug("Assignment Metadata Store is null. Skip getting best possible
assignment version.");
+ }
+
// Read the baseline from metadata store
Map<String, ResourceAssignment> currentBaseline =
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
@@ -547,20 +621,79 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
_baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
currentBaseline, newAssignmentCopy);
- if (_assignmentMetadataStore != null) {
- try {
- _writeLatency.startMeasuringLatency();
- _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
- _writeLatency.endMeasuringLatency();
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to persist the new best
possible assignment.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
- }
+ boolean bestPossibleUpdateSuccessful = false;
+ if (_assignmentMetadataStore != null &&
_assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
+ bestPossibleUpdateSuccessful =
_assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
+ newBestPossibleAssignmentVersion);
} else {
LOG.debug("Assignment Metadata Store is null. Skip persisting the
baseline assignment.");
}
_partialRebalanceLatency.endMeasuringLatency();
LOG.info("Finish calculating the new best possible assignment.");
+
+ if (bestPossibleUpdateSuccessful) {
+ LOG.info("Schedule a new rebalance after the new best possible
calculation has finished.");
+ RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L,
false);
+ }
+ }
+
+ protected Map<String, ResourceAssignment> emergencyRebalance(
+ ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
+ Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
+ RebalanceAlgorithm algorithm)
+ throws HelixRebalanceException {
+ LOG.info("Start emergency rebalance.");
+ _emergencyRebalanceCounter.increment(1L);
+ _emergencyRebalanceLatency.startMeasuringLatency();
+
+ Map<String, ResourceAssignment> currentBestPossibleAssignment =
+ getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet());
+
+ // Step 1: Check for permanent node down
+ AtomicBoolean allNodesActive = new AtomicBoolean(true);
+
currentBestPossibleAssignment.values().parallelStream().forEach((resourceAssignment
-> {
+
resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
+ for (String instance :
resourceAssignment.getReplicaMap(partition).keySet()) {
+ if (!activeNodes.contains(instance)) {
+ allNodesActive.set(false);
+ break;
+ }
+ }
+ });
+ }));
+
+
+ // Step 2: if there are permanent node downs, calculate for a new one best
possible
+ Map<String, ResourceAssignment> newAssignment;
+ if (!allNodesActive.get()) {
+ LOG.info("Emergency rebalance responding to permanent node down.");
+ ClusterModel clusterModel;
+ try {
+ clusterModel =
+
ClusterModelProvider.generateClusterModelForEmergencyRebalance(clusterData,
resourceMap, activeNodes,
+ currentBestPossibleAssignment);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model
for emergency rebalance.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+ newAssignment = calculateAssignment(clusterModel, algorithm);
+ } else {
+ newAssignment = currentBestPossibleAssignment;
+ }
+
+ // Step 3: persist result to metadata store
+ persistBestPossibleAssignment(newAssignment);
+ _emergencyRebalanceLatency.endMeasuringLatency();
+ LOG.info("Finish emergency rebalance");
+
+ partialRebalance(clusterData, resourceMap, activeNodes,
currentStateOutput, algorithm);
+ if (!_asyncPartialRebalanceEnabled) {
+ newAssignment = getBestPossibleAssignment(_assignmentMetadataStore,
currentStateOutput,
+ resourceMap.keySet());
+ persistBestPossibleAssignment(newAssignment);
+ }
+
return newAssignment;
}
@@ -684,6 +817,22 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
return currentBestAssignment;
}
+ private void persistBestPossibleAssignment(Map<String, ResourceAssignment>
bestPossibleAssignment)
+ throws HelixRebalanceException {
+ if (_assignmentMetadataStore != null &&
_assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)) {
+ try {
+ _writeLatency.startMeasuringLatency();
+
_assignmentMetadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+ _writeLatency.endMeasuringLatency();
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to persist the new best
possible assignment.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ } else {
+ LOG.debug("Assignment Metadata Store is null. Skip persisting the best
possible assignment.");
+ }
+ }
+
/**
* Schedule rebalance according to the delayed rebalance logic.
* @param clusterData the current cluster data cache
@@ -710,6 +859,34 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
}
}
+ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider
clusterData,
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ AtomicBoolean allMinActiveReplicaMet = new AtomicBoolean(true);
+
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment ->
{
+ String resourceName = resourceAssignment.getResourceName();
+ IdealState currentIdealState = clusterData.getIdealState(resourceName);
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ int numReplica =
currentIdealState.getReplicaCount(enabledLiveInstances.size());
+ int minActiveReplica =
DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+ currentIdealState), currentIdealState, numReplica);
+
resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
+ int enabledLivePlacementCounter = 0;
+ for (String instance :
resourceAssignment.getReplicaMap(partition).keySet()) {
+ if (enabledLiveInstances.contains(instance)) {
+ enabledLivePlacementCounter++;
+ }
+ }
+
+ if (enabledLivePlacementCounter < Math.min(minActiveReplica,
numReplica)) {
+ allMinActiveReplicaMet.set(false);
+ }
+ });
+ }));
+
+ return !allMinActiveReplicaMet.get();
+ }
+
/**
* Update the rebalanced ideal states according to the real active nodes.
* Since the rebalancing might be done with the delayed logic, the
rebalanced ideal states
@@ -721,10 +898,13 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
* @param baseline the baseline assignment.
* @param algorithm the rebalance algorithm.
*/
- private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
+ protected void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
+ _rebalanceOverwriteCounter.increment(1L);
+ _rebalanceOverwriteLatency.startMeasuringLatency();
+
ClusterModel clusterModel;
try {
// Note this calculation uses the baseline as the best possible
assignment input here.
@@ -761,6 +941,8 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
Math.min(minActiveReplica, numReplica));
newIdealState.setPreferenceLists(finalPreferenceLists);
+
+ _rebalanceOverwriteLatency.endMeasuringLatency();
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 0fc1f2b05..ce82c3207 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -51,7 +51,31 @@ public class ClusterModelProvider {
PARTIAL,
// Set the rebalance scope to cover all replicas that need relocation
based on the cluster
// changes.
- GLOBAL_BASELINE
+ GLOBAL_BASELINE,
+ // Set the rebalance scope to cover only replicas that are assigned to
downed instances.
+ EMERGENCY
+ }
+
+ /**
+ * Generate a new Cluster Model object according to the current cluster
status for emergency
+ * rebalance. The rebalance scope is configured for recovering replicas that
are on permanently
+ * downed nodes
+ * @param dataProvider The controller's data cache.
+ * @param resourceMap The full list of the resources to be
rebalanced. Note that any
+ * resources that are not in this list will be
removed from the
+ * final assignment.
+ * @param activeInstances The active instances that will be used in
the calculation.
+ * Note this list can be different from the
real active node list
+ * according to the rebalancer logic.
+ * @param bestPossibleAssignment The persisted Best Possible assignment that
was generated in the
+ * previous rebalance.
+ * @return the new cluster model
+ */
+ public static ClusterModel
generateClusterModelForEmergencyRebalance(ResourceControllerDataProvider
dataProvider,
+ Map<String, Resource> resourceMap, Set<String> activeInstances,
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ return generateClusterModel(dataProvider, resourceMap, activeInstances,
Collections.emptyMap(),
+ Collections.emptyMap(), bestPossibleAssignment,
RebalanceScopeType.EMERGENCY);
}
/**
@@ -165,6 +189,10 @@ public class ClusterModelProvider {
findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap,
activeInstances,
idealAssignment, currentAssignment, allocatedReplicas);
break;
+ case EMERGENCY:
+ toBeAssignedReplicas =
findToBeAssignedReplicasOnDownInstances(replicaMap, activeInstances,
+ currentAssignment, allocatedReplicas);
+ break;
default:
throw new HelixException("Unknown rebalance scope type: " + scopeType);
}
@@ -391,6 +419,46 @@ public class ClusterModelProvider {
return toBeAssignedReplicas;
}
+ /**
+ * Find replicas that were assigned to non-active nodes in the current
assignment.
+ *
+ * @param replicaMap A map contains all the replicas grouped by
resource name.
+ * @param activeInstances All the instances that are live and enabled
according to the delay rebalance configuration.
+ * @param currentAssignment The current assignment that was generated
in the previous rebalance.
+ * @param allocatedReplicas A map of <Instance -> replicas> to return
the allocated replicas grouped by the target instance name.
+ * @return The replicas that need to be reassigned.
+ */
+ private static Set<AssignableReplica>
findToBeAssignedReplicasOnDownInstances(
+ Map<String, Set<AssignableReplica>> replicaMap, Set<String>
activeInstances,
+ Map<String, ResourceAssignment> currentAssignment,
+ Map<String, Set<AssignableReplica>> allocatedReplicas) {
+ // For any replica that are assigned to non-active instances (down
instances), add them.
+ Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+ for (String resourceName : replicaMap.keySet()) {
+ Map<String, Map<String, Set<String>>> stateInstanceMap =
getStateInstanceMap(currentAssignment.get(resourceName));
+
+ for (AssignableReplica replica : replicaMap.get(resourceName)) {
+ String partitionName = replica.getPartitionName();
+ String replicaState = replica.getReplicaState();
+ Set<String> currentAllocations =
+ stateInstanceMap.getOrDefault(partitionName,
Collections.emptyMap())
+ .getOrDefault(replicaState, Collections.emptySet());
+ if (!currentAllocations.isEmpty()) {
+ String allocatedInstance = currentAllocations.iterator().next();
+ if (activeInstances.contains(allocatedInstance)) {
+ allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new
HashSet<>()).add(replica);
+ }
+ else {
+ toBeAssignedReplicas.add(replica);
+ }
+ currentAllocations.remove(allocatedInstance);
+ }
+ }
+ }
+
+ return toBeAssignedReplicas;
+ }
+
/**
* Filter to remove all invalid allocations that are not on the active
instances.
* @param assignment
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 829724142..c4f9f914f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -184,6 +184,7 @@ public class ClusterConfig extends HelixProperty {
private final static int MAX_REBALANCE_PREFERENCE = 1000;
private final static int MIN_REBALANCE_PREFERENCE = 0;
public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED =
true;
+ public final static boolean DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED =
true;
private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET
= -1;
private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index dcd9a0857..94a99c95a 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -43,6 +43,8 @@ public class WagedRebalancerMetricCollector extends
MetricCollector {
// Per-stage latency metrics
GlobalBaselineCalcLatencyGauge,
PartialRebalanceLatencyGauge,
+ EmergencyRebalanceLatencyGauge,
+ RebalanceOverwriteLatencyGauge,
// The following latency metrics are related to AssignmentMetadataStore
StateReadLatencyGauge,
@@ -61,7 +63,9 @@ public class WagedRebalancerMetricCollector extends
MetricCollector {
// Waged rebalance counters.
GlobalBaselineCalcCounter,
- PartialRebalanceCounter
+ PartialRebalanceCounter,
+ EmergencyRebalanceCounter,
+ RebalanceOverwriteCounter
}
public WagedRebalancerMetricCollector(String clusterName) {
@@ -97,6 +101,12 @@ public class WagedRebalancerMetricCollector extends
MetricCollector {
LatencyMetric partialRebalanceLatencyGauge =
new
RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
getResetIntervalInMs());
+ LatencyMetric emergencyRebalanceLatencyGauge =
+ new
RebalanceLatencyGauge(WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+ getResetIntervalInMs());
+ LatencyMetric rebalanceOverwriteLatencyGauge =
+ new
RebalanceLatencyGauge(WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+ getResetIntervalInMs());
LatencyMetric stateReadLatencyGauge =
new
RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
getResetIntervalInMs());
@@ -111,15 +121,23 @@ public class WagedRebalancerMetricCollector extends
MetricCollector {
new
RebalanceCounter(WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name());
CountMetric partialRebalanceCounter =
new
RebalanceCounter(WagedRebalancerMetricNames.PartialRebalanceCounter.name());
+ CountMetric emergencyRebalanceCounter =
+ new
RebalanceCounter(WagedRebalancerMetricNames.EmergencyRebalanceCounter.name());
+ CountMetric rebalanceOverwriteCounter =
+ new
RebalanceCounter(WagedRebalancerMetricNames.RebalanceOverwriteCounter.name());
// Add metrics to WagedRebalancerMetricCollector
addMetric(globalBaselineCalcLatencyGauge);
addMetric(partialRebalanceLatencyGauge);
+ addMetric(emergencyRebalanceLatencyGauge);
+ addMetric(rebalanceOverwriteLatencyGauge);
addMetric(stateReadLatencyGauge);
addMetric(stateWriteLatencyGauge);
addMetric(baselineDivergenceGauge);
addMetric(calcFailureCount);
addMetric(globalBaselineCalcCounter);
addMetric(partialRebalanceCounter);
+ addMetric(emergencyRebalanceCounter);
+ addMetric(rebalanceOverwriteCounter);
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index a2220a5d4..fb59b8146 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -39,19 +39,30 @@ public class MockAssignmentMetadataStore extends
AssignmentMetadataStore {
return _globalBaseline == null ? Collections.emptyMap() : _globalBaseline;
}
- public boolean persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
+ public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
_globalBaseline = globalBaseline;
- return true;
}
public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _bestPossibleAssignment == null ? Collections.emptyMap() :
_bestPossibleAssignment;
}
- public boolean persistBestPossibleAssignment(
+ public void persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
_bestPossibleAssignment = bestPossibleAssignment;
- return true;
+ _bestPossibleVersion++;
+ }
+
+ public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+ Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+ // Check if the version is stale by this point
+ if (newVersion > _bestPossibleVersion) {
+ _bestPossibleAssignment = bestPossibleAssignment;
+ _bestPossibleVersion = newVersion;
+ return true;
+ }
+
+ return false;
}
public void close() {
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 84bc82957..7c5740423 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -96,31 +96,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
Assert.assertTrue(_store.getBestPossibleAssignment().isEmpty());
}
- /**
- * Test that if the old assignment and new assignment are the same,
- */
@Test(dependsOnMethods = "testReadEmptyBaseline")
- public void testAvoidingRedundantWrite() {
- Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
-
- // Call persist functions
- _store.persistBaseline(dummyAssignment);
- _store.persistBestPossibleAssignment(dummyAssignment);
-
- // Check that only one version exists
- Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
- Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(),
1);
-
- // Call persist functions again
- _store.persistBaseline(dummyAssignment);
- _store.persistBestPossibleAssignment(dummyAssignment);
-
- // Check that only one version exists still
- Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
- Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(),
1);
- }
-
- @Test(dependsOnMethods = "testAvoidingRedundantWrite")
public void testAssignmentCache() {
Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
// Call persist functions
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index cfcbe298e..d011c5c2e 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,9 +20,12 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import java.io.IOException;
+import java.sql.Array;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -32,8 +35,10 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import
org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
import
org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -50,6 +55,7 @@ import
org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -255,8 +261,8 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
Map<String, ResourceAssignment> testResourceAssignmentMap = new
HashMap<>();
ZNRecord mappingNode = new ZNRecord(_resourceNames.get(0));
HashMap<String, String> mapping = new HashMap<>();
- mapping.put(_partitionNames.get(0), "MASTER");
- mappingNode.setMapField(_testInstanceId, mapping);
+ mapping.put(_testInstanceId, "MASTER");
+ mappingNode.setMapField(_partitionNames.get(0), mapping);
testResourceAssignmentMap.put(_resourceNames.get(0), new
ResourceAssignment(mappingNode));
_metadataStore.reset();
@@ -373,9 +379,9 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
clusterData.getEnabledLiveInstances(), new CurrentStateOutput(),
_algorithm);
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
- Assert.assertEquals(ex.getFailureType(),
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
+ Assert.assertEquals(ex.getFailureType(),
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
Assert.assertEquals(ex.getMessage(),
- "Failed to generate cluster model for partial rebalance. Failure
Type: INVALID_CLUSTER_STATUS");
+ "Failed to calculate for the new best possible. Failure Type:
FAILED_TO_CALCULATE");
}
// The rebalance will be done with empty mapping result since there is no
previously calculated
@@ -389,7 +395,7 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
public void testInvalidRebalancerStatus() throws IOException {
// Mock a metadata store that will fail on all the calls.
AssignmentMetadataStore metadataStore =
Mockito.mock(AssignmentMetadataStore.class);
- when(metadataStore.getBaseline())
+ when(metadataStore.getBestPossibleAssignment())
.thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
WagedRebalancer rebalancer = new WagedRebalancer(metadataStore,
_algorithm, Optional.empty());
@@ -404,7 +410,7 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
Assert.assertEquals(ex.getFailureType(),
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
Assert.assertEquals(ex.getMessage(),
- "Failed to get the current baseline assignment because of unexpected
error. Failure Type: INVALID_REBALANCER_STATUS");
+ "Failed to get the current best possible assignment because of
unexpected error. Failure Type: INVALID_REBALANCER_STATUS");
}
}
@@ -439,7 +445,7 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(),
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
- Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type:
FAILED_TO_CALCULATE");
+ Assert.assertEquals(ex.getMessage(), "Failed to calculate for the new
best possible. Failure Type: FAILED_TO_CALCULATE");
}
// But if call with the public method computeNewIdealStates(), the
rebalance will return with
// the previous rebalance result.
@@ -575,6 +581,141 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
}
+ @Test(dependsOnMethods = "testRebalance")
+ public void testEmergencyRebalance() throws IOException,
HelixRebalanceException {
+ _metadataStore.reset();
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new
MockRebalanceAlgorithm());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore,
spyAlgorithm, Optional.empty());
+
+ // Cluster config change will trigger baseline to be recalculated.
+ when(clusterData.getRefreshedChangeTypes())
+
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+ Map<String, Resource> resourceMap =
+
clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
+ return resource;
+ }));
+ // Populate best possible assignment
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new
CurrentStateOutput());
+ // Global Rebalance once, Partial Rebalance once
+ verify(spyAlgorithm, times(2)).calculate(any());
+
+ // Artificially insert an offline node in the best possible assignment
+ Map<String, ResourceAssignment> bestPossibleAssignment =
+ _metadataStore.getBestPossibleAssignment();
+ String offlineResource = _resourceNames.get(0);
+ String offlinePartition = _partitionNames.get(0);
+ String offlineState = "MASTER";
+ String offlineInstance = "offlineInstance";
+ for (Partition partition :
bestPossibleAssignment.get(offlineResource).getMappedPartitions()) {
+ if (partition.getPartitionName().equals(offlinePartition)) {
+ bestPossibleAssignment.get(offlineResource)
+ .addReplicaMap(partition,
Collections.singletonMap(offlineInstance, offlineState));
+ }
+ }
+ _metadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+
+ // This should trigger both emergency rebalance and partial rebalance
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new
CurrentStateOutput());
+ ArgumentCaptor<ClusterModel> capturedClusterModel =
ArgumentCaptor.forClass(ClusterModel.class);
+ // 2 from previous case, Emergency + Partial from this case, 4 in total
+ verify(spyAlgorithm, times(4)).calculate(capturedClusterModel.capture());
+ // In the cluster model for Emergency rebalance, the assignableReplica is
the offline one
+ ClusterModel clusterModelForEmergencyRebalance =
capturedClusterModel.getAllValues().get(2);
+
Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().size(),
1);
+
Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).size(),
1);
+ AssignableReplica assignableReplica =
+
clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).iterator().next();
+ Assert.assertEquals(assignableReplica.getPartitionName(),
offlinePartition);
+ Assert.assertEquals(assignableReplica.getReplicaState(), offlineState);
+
+ bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
+ for (Map.Entry<String, ResourceAssignment> entry :
bestPossibleAssignment.entrySet()) {
+ ResourceAssignment resourceAssignment = entry.getValue();
+ for (Partition partition : resourceAssignment.getMappedPartitions()) {
+ for (String instance:
resourceAssignment.getReplicaMap(partition).keySet()) {
+ Assert.assertNotSame(instance, offlineInstance);
+ }
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = "testRebalance")
+ public void testRebalanceOverwriteTrigger() throws IOException,
HelixRebalanceException {
+ _metadataStore.reset();
+
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ // Enable delay rebalance
+ ClusterConfig clusterConfig = clusterData.getClusterConfig();
+ clusterConfig.setDelayRebalaceEnabled(true);
+ clusterConfig.setRebalanceDelayTime(1);
+ clusterData.setClusterConfig(clusterConfig);
+
+ // force create a fake offlineInstance that's in delay window
+ Set<String> instances = new HashSet<>(_instances);
+ String offlineInstance = "offlineInstance";
+ instances.add(offlineInstance);
+ when(clusterData.getAllInstances()).thenReturn(instances);
+ Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
+ instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() +
Integer.MAX_VALUE);
+
when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
+ Map<String, InstanceConfig> instanceConfigMap =
clusterData.getInstanceConfigMap();
+ instanceConfigMap.put(offlineInstance,
createMockInstanceConfig(offlineInstance));
+ when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+ // Set minActiveReplica to 0 so that requireRebalanceOverwrite returns
false
+ Map<String, IdealState> isMap = new HashMap<>();
+ for (String resource : _resourceNames) {
+ IdealState idealState = clusterData.getIdealState(resource);
+ idealState.setMinActiveReplicas(0);
+ isMap.put(resource, idealState);
+ }
+ when(clusterData.getIdealState(anyString())).thenAnswer(
+ (Answer<IdealState>) invocationOnMock ->
isMap.get(invocationOnMock.getArguments()[0]));
+ when(clusterData.getIdealStates()).thenReturn(isMap);
+
+ MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new
MockRebalanceAlgorithm());
+ WagedRebalancer rebalancer = Mockito.spy(new
WagedRebalancer(_metadataStore, spyAlgorithm, Optional.empty()));
+
+ // Cluster config change will trigger baseline to be recalculated.
+ when(clusterData.getRefreshedChangeTypes())
+
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+ Map<String, Resource> resourceMap =
+
clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
+ return resource;
+ }));
+ // Populate best possible assignment
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new
CurrentStateOutput());
+ verify(rebalancer, times(1)).requireRebalanceOverwrite(any(), any());
+ verify(rebalancer, times(0)).applyRebalanceOverwrite(any(), any(), any(),
any(), any());
+
+ // Set minActiveReplica to 1 so that requireRebalanceOverwrite returns true
+ for (String resource : _resourceNames) {
+ IdealState idealState = clusterData.getIdealState(resource);
+ idealState.setMinActiveReplicas(3);
+ isMap.put(resource, idealState);
+ }
+ when(clusterData.getIdealState(anyString())).thenAnswer(
+ (Answer<IdealState>) invocationOnMock ->
isMap.get(invocationOnMock.getArguments()[0]));
+ when(clusterData.getIdealStates()).thenReturn(isMap);
+
+ _metadataStore.reset();
+ // Update the config so the cluster config will be marked as changed.
+ clusterConfig = clusterData.getClusterConfig();
+ Map<String, Integer> defaultCapacityMap =
+ new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
+ defaultCapacityMap.put("foobar", 0);
+ clusterConfig.setDefaultInstanceCapacityMap(defaultCapacityMap);
+ clusterData.setClusterConfig(clusterConfig);
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new
CurrentStateOutput());
+ verify(rebalancer, times(2)).requireRebalanceOverwrite(any(), any());
+ verify(rebalancer, times(1)).applyRebalanceOverwrite(any(), any(), any(),
any(), any());
+ }
+
@Test(dependsOnMethods = "testRebalance")
public void testReset() throws IOException, HelixRebalanceException {
_metadataStore.reset();