This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch replica_level_throttle in repository https://gitbox.apache.org/repos/asf/helix.git
commit b641c97dfcf39ab8f7262c2ba5764cfe3a6fa4c6 Author: Junkai Xue <[email protected]> AuthorDate: Sun May 16 14:41:34 2021 -0700 Applying per replica logic for entire stage #1724 This commit contains: 1.Per resource looping with dynamic rebalance type computation and partition level ordering. 2.Let entire stage works against per replica throttling logic. 3.Move intermediate state compute logic to a centralized place. --- .../helix/controller/common/PartitionStateMap.java | 9 +- .../stages/IntermediateStateCalcStage.java | 244 +++++++++++---------- .../helix/controller/stages/MessageOutput.java | 4 + 3 files changed, 137 insertions(+), 120 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java b/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java index e3c899a..7bfa6dc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java +++ b/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.helix.model.Partition; /** @@ -39,10 +40,12 @@ public class PartitionStateMap { _stateMap = new HashMap<>(); } - public PartitionStateMap(String resourceName, - Map<Partition, Map<String, String>> partitionStateMap) { + // Deep copy of the partitionStateMap is a safer way. + public PartitionStateMap(String resourceName, Map<Partition, Map<String, String>> partitionStateMap) { _resourceName = resourceName; - _stateMap = partitionStateMap; + _stateMap = partitionStateMap.entrySet() + .stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> new HashMap<>(e.getValue()))); } public Set<Partition> partitionSet() { diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index 6a5d2f1..b91dca6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -73,16 +73,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); + MessageOutput messageOutput =event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); if (currentStateOutput == null || bestPossibleStateOutput == null || resourceToRebalance == null - || cache == null) { + || cache == null || messageOutput == null) { throw new StageException(String.format("Missing attributes in event: %s. " - + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)", - event, currentStateOutput, bestPossibleStateOutput, resourceToRebalance, cache)); + + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |MESSAGE_SELECT (%s) |DataCache (%s)", + event, currentStateOutput, bestPossibleStateOutput, resourceToRebalance, messageOutput, cache)); } IntermediateStateOutput intermediateStateOutput = - compute(event, resourceToRebalance, currentStateOutput, bestPossibleStateOutput); + compute(event, resourceToRebalance, currentStateOutput, bestPossibleStateOutput, messageOutput); event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), intermediateStateOutput); // Make sure no instance has more replicas/partitions assigned than maxPartitionPerInstance. If @@ -106,7 +107,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @return */ private IntermediateStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap, - CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) { + CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput, MessageOutput messageOutput) { IntermediateStateOutput output = new IntermediateStateOutput(); ResourceControllerDataProvider dataCache = event.getAttribute(AttributeName.ControllerDataProvider.name()); @@ -173,14 +174,13 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } try { - output.setState(resourceName, - computeIntermediatePartitionState(dataCache, clusterStatusMonitor, idealState, - resourceMap.get(resourceName), currentStateOutput, - bestPossibleStateOutput.getPartitionStateMap(resourceName), - bestPossibleStateOutput.getPreferenceLists(resourceName), throttleController)); + output.setState(resourceName, computeIntermediatePartitionState(dataCache, clusterStatusMonitor, idealState, + resourceMap.get(resourceName), currentStateOutput, + bestPossibleStateOutput.getPartitionStateMap(resourceName), + bestPossibleStateOutput.getPreferenceLists(resourceName), throttleController, + messageOutput.getResourceMessageMap(resourceName))); } catch (HelixException ex) { - LogUtil.logInfo(logger, _eventId, - "Failed to calculate intermediate partition states for resource " + resourceName, ex); + LogUtil.logInfo(logger, _eventId, "Failed to calculate intermediate partition states for resource " + resourceName, ex); failedResources.add(resourceName); } } @@ -294,99 +294,41 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { ClusterStatusMonitor clusterStatusMonitor, IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput, PartitionStateMap bestPossiblePartitionStateMap, Map<String, List<String>> preferenceLists, - StateTransitionThrottleController throttleController) { + StateTransitionThrottleController throttleController, Map<Partition, List<Message>> resourceMessageMap) { String resourceName = resource.getResourceName(); LogUtil.logDebug(logger, _eventId, String.format("Processing resource: %s", resourceName)); - // Throttling is applied only on FULL-AUTO mode - if (!throttleController.isThrottleEnabled() - || !IdealState.RebalanceMode.FULL_AUTO.equals(idealState.getRebalanceMode())) { + // Throttling is applied only on FULL-AUTO mode and if the resource message map is empty, no throttling needed. + if (!throttleController.isThrottleEnabled() || !IdealState.RebalanceMode.FULL_AUTO.equals( + idealState.getRebalanceMode()) || resourceMessageMap.isEmpty()) { return bestPossiblePartitionStateMap; } String stateModelDefName = idealState.getStateModelDefRef(); StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName); - PartitionStateMap intermediatePartitionStateMap = new PartitionStateMap(resourceName); + // This require a deep copy of current state map because some of the states will be overwritten by applying + // messages to it. - Set<Partition> partitionsNeedRecovery = new HashSet<>(); - Set<Partition> partitionsNeedLoadBalance = new HashSet<>(); Set<Partition> partitionsWithErrorStateReplica = new HashSet<>(); - for (Partition partition : resource.getPartitions()) { - Map<String, String> currentStateMap = - currentStateOutput.getCurrentStateMap(resourceName, partition); - Map<String, String> bestPossibleMap = - bestPossiblePartitionStateMap.getPartitionMap(partition); - List<String> preferenceList = preferenceLists.get(partition.getPartitionName()); - - RebalanceType rebalanceType = getRebalanceType(cache, bestPossibleMap, preferenceList, - stateModelDef, currentStateMap, idealState, partition.getPartitionName()); - - // TODO: refine getRebalanceType to return more accurate rebalance types. So the following - // logic doesn't need to check for more details. - boolean isRebalanceNeeded = false; - - // Check whether partition has any ERROR state replicas - if (currentStateMap.values().contains(HelixDefinedState.ERROR.name())) { - partitionsWithErrorStateReplica.add(partition); - } - - // Number of states required by StateModelDefinition are not satisfied, need recovery - if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) { - // Check if recovery is needed for this partition - if (!currentStateMap.equals(bestPossibleMap)) { - partitionsNeedRecovery.add(partition); - isRebalanceNeeded = true; - } - } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) { - // Number of states required by StateModelDefinition are satisfied, but to achieve - // BestPossibleState, need load balance - partitionsNeedLoadBalance.add(partition); - isRebalanceNeeded = true; - } - - // Currently at BestPossibleState, no further action necessary - if (!isRebalanceNeeded) { - Map<String, String> intermediateMap = new HashMap<>(bestPossibleMap); - intermediatePartitionStateMap.setState(partition, intermediateMap); - } - } - - if (!partitionsNeedRecovery.isEmpty()) { - LogUtil.logInfo(logger, _eventId, String.format( - "Recovery balance needed for %s partitions: %s", resourceName, partitionsNeedRecovery)); - } - if (!partitionsNeedLoadBalance.isEmpty()) { - LogUtil.logInfo(logger, _eventId, String.format("Load balance needed for %s partitions: %s", - resourceName, partitionsNeedLoadBalance)); - } - if (!partitionsWithErrorStateReplica.isEmpty()) { - LogUtil.logInfo(logger, _eventId, - String.format("Partition currently has an ERROR replica in %s partitions: %s", - resourceName, partitionsWithErrorStateReplica)); - } - - chargePendingTransition(resource, currentStateOutput, throttleController, cache, preferenceLists, stateModelDef, intermediatePartitionStateMap); - - // Perform recovery balance - Set<Partition> recoveryThrottledPartitions = - recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController, - intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput, - cache.getStateModelDef(resource.getStateModelDefRef()).getTopState(), cache); - - // Perform load balance upon checking conditions below - Set<Partition> loadbalanceThrottledPartitions; + Set<String> messagesForRecovery = new HashSet<>(); + Set<String> messagesForLoad = new HashSet<>(); + Set<String> messagesThrottledForRecovery = new HashSet<>(); + Set<String> messagesThrottledForLoad = new HashSet<>(); ClusterConfig clusterConfig = cache.getClusterConfig(); // If the threshold (ErrorOrRecovery) is set, then use it, if not, then check if the old // threshold (Error) is set. If the old threshold is set, use it. If not, use the default value // for the new one. This is for backward-compatibility int threshold = 1; // Default threshold for ErrorOrRecoveryPartitionThresholdForLoadBalance - int partitionCount = partitionsWithErrorStateReplica.size(); + // Keep the error count as partition level. This logic only applies to downward state transition determination + int numPartitionsWithErrorReplica = (int) currentStateOutput.getCurrentStateMap(resourceName) + .values() + .stream() + .filter(i -> i.values().contains(HelixDefinedState.ERROR.name())) + .count(); if (clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance() != -1) { // ErrorOrRecovery is set threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance(); - partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold is - // set } else { if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) { // 0 is the default value so the old threshold has been set @@ -396,23 +338,79 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { // Perform regular load balance only if the number of partitions in recovery and in error is // less than the threshold. Otherwise, only allow downward-transition load balance - boolean onlyDownwardLoadBalance = partitionCount > threshold; + boolean onlyDownwardLoadBalance = numPartitionsWithErrorReplica > threshold; + + chargePendingTransition(resource, currentStateOutput, throttleController, cache, preferenceLists, stateModelDef); - loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput, - bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap, - partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName), - onlyDownwardLoadBalance, stateModelDef, cache); + // Sort partitions in case of urgent partition need to take the quota first. + List<Partition> partitions = new ArrayList<>(resource.getPartitions()); + Collections.sort(partitions, new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(), + currentStateOutput.getCurrentStateMap(resourceName), stateModelDef.getTopState())); + for (Partition partition : partitions) { + List<Message> messagesToThrottle = new ArrayList<>(resourceMessageMap.get(partition)); + if (messagesToThrottle == null || messagesToThrottle.isEmpty()) { + continue; + } + + Map<String, String> derivedCurrentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition) + .entrySet() + .stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); + List<String> preferenceList = preferenceLists.get(partition.getPartitionName()); + Map<String, Integer> requiredState = getRequiredStates(resourceName, cache, preferenceList); + Collections.sort(messagesToThrottle, + new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap())); + for (Message message : messagesToThrottle) { + RebalanceType rebalanceType = getRebalanceTypePerMessage(requiredState, message, derivedCurrentStateMap); + + // Number of states required by StateModelDefinition are not satisfied, need recovery + if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) { + messagesForRecovery.add(message.getId()); + recoveryRebalance(resource, partition, throttleController, message, cache, messagesThrottledForRecovery, + resourceMessageMap); + } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) { + messagesForLoad.add(message.getId()); + loadRebalance(resource, partition, throttleController, message, cache, onlyDownwardLoadBalance, stateModelDef, + messagesThrottledForLoad, resourceMessageMap); + } + + // Apply the message to temporary current state map + if (!messagesThrottledForRecovery.contains(message.getId()) && !messagesThrottledForLoad.contains( + message.getId())) { + derivedCurrentStateMap.put(message.getTgtName(), message.getToState()); + } + } + } + // TODO: We may need to optimize it to be async compute for intermediate state output. + PartitionStateMap intermediatePartitionStateMap = + new PartitionStateMap(resourceName, currentStateOutput.getCurrentStateMap(resourceName)); + computeIntermediateMap(intermediatePartitionStateMap, currentStateOutput.getPendingMessageMap(resourceName), + resourceMessageMap); + + if (!messagesForRecovery.isEmpty()) { + LogUtil.logInfo(logger, _eventId, String.format( + "Recovery balance needed for %s with messages: %s", resourceName, messagesForRecovery)); + } + if (!messagesForLoad.isEmpty()) { + LogUtil.logInfo(logger, _eventId, String.format("Load balance needed for %s with messages: %s", + resourceName, messagesForLoad)); + } + if (!partitionsWithErrorStateReplica.isEmpty()) { + LogUtil.logInfo(logger, _eventId, + String.format("Partition currently has an ERROR replica in %s partitions: %s", + resourceName, partitionsWithErrorStateReplica)); + } if (clusterStatusMonitor != null) { - clusterStatusMonitor.updateRebalancerStats(resourceName, partitionsNeedRecovery.size(), - partitionsNeedLoadBalance.size(), recoveryThrottledPartitions.size(), - loadbalanceThrottledPartitions.size()); + clusterStatusMonitor.updateRebalancerStats(resourceName, messagesForRecovery.size(), + messagesForLoad.size(), messagesThrottledForRecovery.size(), + messagesThrottledForLoad.size()); } if (logger.isDebugEnabled()) { logPartitionMapState(resourceName, new HashSet<>(resource.getPartitions()), - partitionsNeedRecovery, recoveryThrottledPartitions, partitionsNeedLoadBalance, - loadbalanceThrottledPartitions, currentStateOutput, bestPossiblePartitionStateMap, + messagesForRecovery, messagesThrottledForRecovery, messagesForLoad, + messagesThrottledForLoad, currentStateOutput, bestPossiblePartitionStateMap, intermediatePartitionStateMap); } @@ -449,8 +447,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { */ private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput, StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache, - Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition, - PartitionStateMap intermediatePartitionStateMap) { + Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition) { String resourceName = resource.getResourceName(); // check and charge pending transitions for (Partition partition : resource.getPartitions()) { @@ -478,7 +475,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { throttleController.chargeResource(rebalanceType, resourceName); throttleController.chargeCluster(rebalanceType); } - intermediatePartitionStateMap.setState(partition, message.getTgtName(), message.getToState()); } } } @@ -489,7 +485,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param resource the resource to throttle * @param throttleController throttle controller object * @param messageToThrottle the message to be throttled - * @param intermediatePartitionStateMap output result for this stage that intermediate state map * @param cache cache object for computational metadata from external storage * @param messagesThrottled messages that have already been throttled * @param resourceMessageMap the map for all messages from MessageSelectStage. Remove the message @@ -497,10 +492,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { */ private void recoveryRebalance(Resource resource, Partition partition, StateTransitionThrottleController throttleController, Message messageToThrottle, - PartitionStateMap intermediatePartitionStateMap, ResourceControllerDataProvider cache, - Set<Message> messagesThrottled, Map<Partition, List<Message>> resourceMessageMap) { + ResourceControllerDataProvider cache, Set<String> messagesThrottled, + Map<Partition, List<Message>> resourceMessageMap) { throttleStateTransitionsForReplica(throttleController, resource.getResourceName(), partition, messageToThrottle, - messagesThrottled, intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE, cache, resourceMessageMap); + messagesThrottled, RebalanceType.RECOVERY_BALANCE, cache, resourceMessageMap); } /** @@ -509,7 +504,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param resource the resource to throttle * @param throttleController throttle controller object * @param messageToThrottle the message to be throttle - * @param intermediatePartitionStateMap output result for this stage that intermediate state map * @param cache cache object for computational metadata from external storage * @param onlyDownwardLoadBalance does allow only downward load balance * @param stateModelDefinition state model definition of this resource @@ -519,16 +513,15 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { */ private void loadRebalance(Resource resource, Partition partition, StateTransitionThrottleController throttleController, Message messageToThrottle, - PartitionStateMap intermediatePartitionStateMap, ResourceControllerDataProvider cache, - boolean onlyDownwardLoadBalance, StateModelDefinition stateModelDefinition, Set<Message> messagesThrottled, + ResourceControllerDataProvider cache, + boolean onlyDownwardLoadBalance, StateModelDefinition stateModelDefinition, Set<String> messagesThrottled, Map<Partition, List<Message>> resourceMessageMap) { if (onlyDownwardLoadBalance && isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) { // Remove the message already allowed for downward state transitions. - intermediatePartitionStateMap.setState(partition, messageToThrottle.getTgtName(), messageToThrottle.getToState()); return; } throttleStateTransitionsForReplica(throttleController, resource.getResourceName(), partition, messageToThrottle, - messagesThrottled, intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache, resourceMessageMap); + messagesThrottled, RebalanceType.LOAD_BALANCE, cache, resourceMessageMap); } /** @@ -540,17 +533,15 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param messageToThrottle the message to be throttled * @param messagesThrottled the cumulative set of messages that have been throttled already. These * messages represent the replicas of this partition that have been throttled. - * @param intermediatePartitionStateMap the cumulative partition-state mapping as a result of the throttling step - * of IntermediateStateCalcStage * @param rebalanceType the rebalance type to charge quota * @param cache cached cluster metadata required by the throttle controller * @param resourceMessageMap the map for all messages from MessageSelectStage. Remove the message * if it has been throttled. */ private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController, - String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled, - PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType, - ResourceControllerDataProvider cache, Map<Partition, List<Message>> resourceMessageMap) { + String resourceName, Partition partition, Message messageToThrottle, Set<String> messagesThrottled, + RebalanceType rebalanceType, ResourceControllerDataProvider cache, + Map<Partition, List<Message>> resourceMessageMap) { boolean hasReachedThrottlingLimit = false; if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) { hasReachedThrottlingLimit = true; @@ -578,13 +569,12 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (!hasReachedThrottlingLimit) { throttleController.chargeCluster(rebalanceType); throttleController.chargeResource(rebalanceType, resourceName); - intermediatePartitionStateMap.setState(partition, messageToThrottle.getTgtName(), messageToThrottle.getToState()); } else { // Intermediate Map is based on current state // Remove the message from MessageSelection result if it has been throttled since the message will be dispatched // by next stage if it is not removed. resourceMessageMap.get(partition).remove(messageToThrottle); - messagesThrottled.add(messageToThrottle); + messagesThrottled.add(messageToThrottle.getId()); } } @@ -716,8 +706,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param intermediateStateMap */ private void logPartitionMapState(String resource, Set<Partition> allPartitions, - Set<Partition> recoveryPartitions, Set<Partition> recoveryThrottledPartitions, - Set<Partition> loadbalancePartitions, Set<Partition> loadbalanceThrottledPartitions, + Set<String> recoveryPartitions, Set<String> recoveryThrottledPartitions, + Set<String> loadbalancePartitions, Set<String> loadbalanceThrottledPartitions, CurrentStateOutput currentStateOutput, PartitionStateMap bestPossibleStateMap, PartitionStateMap intermediateStateMap) { @@ -894,6 +884,26 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } /** + * Generate the IntermediateStateMap from pending messages + message generated. + */ + private void computeIntermediateMap(PartitionStateMap intermediateStateMap, + Map<Partition, Map<String, Message>> pendingMessageMap, Map<Partition, List<Message>> resourceMessageMap) { + for (Map.Entry<Partition, Map<String, Message>> entry : pendingMessageMap.entrySet()) { + entry.getValue() + .entrySet() + .stream() + .forEach( + e -> intermediateStateMap.setState(entry.getKey(), e.getValue().getTgtName(), e.getValue().getToState())); + } + + for (Map.Entry<Partition, List<Message>> entry : resourceMessageMap.entrySet()) { + entry.getValue() + .stream() + .forEach(e -> intermediateStateMap.setState(entry.getKey(), e.getTgtName(), e.getToState())); + } + } + + /** * Handle a partition with a pending message so that the partition will not be double-charged or double-assigned during recovery and load balance. * @param partition * @param partitionsNeedRecovery diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java index dd545c3..ad7e6c8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java @@ -64,6 +64,10 @@ public class MessageOutput { return Collections.emptyList(); } + public Map<Partition, List<Message>> getResourceMessageMap(String resourceName) { + return _messagesMap.getOrDefault(resourceName, Collections.emptyMap()); + } + @Override public String toString() { return _messagesMap.toString();
