This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 402dd6d0b4fb25625b111318dfbd0c4892ff790b Author: Hunter Lee <[email protected]> AuthorDate: Fri Mar 29 12:13:48 2019 -0700 IntermediateStateCalcStage style change This diff includes code style fixes and refactor using Java 8 features. RB=1613452 BUG=HELIX-1742 G=helix-reviewers A=jjwang Signed-off-by: Hunter Lee <[email protected]> --- .../stages/IntermediateStateCalcStage.java | 106 ++++++++++----------- 1 file changed, 48 insertions(+), 58 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index 888bd12..b6ab425 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -68,7 +68,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); Map<String, Resource> resourceToRebalance = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); - ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); + ResourceControllerDataProvider cache = + event.getAttribute(AttributeName.ControllerDataProvider.name()); if (currentStateOutput == null || bestPossibleStateOutput == null || resourceToRebalance == null || cache == null) { @@ -137,7 +138,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField)); } } - Collections.sort(prioritizedResourceList, new ResourcePriorityComparator()); + prioritizedResourceList.sort(new ResourcePriorityComparator()); } ClusterStatusMonitor clusterStatusMonitor = @@ -160,8 +161,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { IdealState idealState = dataCache.getIdealState(resourceName); if (idealState == null) { // If IdealState is null, use an empty one - LogUtil.logInfo(logger, _eventId, String - .format("IdealState for resource %s does not exist; resource may not exist anymore", + LogUtil.logInfo(logger, _eventId, + String.format( + "IdealState for resource %s does not exist; resource may not exist anymore", resourceName)); idealState = new IdealState(resourceName); idealState.setStateModelDefRef(resource.getStateModelDefRef()); @@ -183,8 +185,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (clusterStatusMonitor != null) { clusterStatusMonitor.setResourceRebalanceStates(failedResources, ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED); - clusterStatusMonitor - .setResourceRebalanceStates(output.resourceSet(), ResourceMonitor.RebalanceStatus.NORMAL); + clusterStatusMonitor.setResourceRebalanceStates(output.resourceSet(), + ResourceMonitor.RebalanceStatus.NORMAL); } return output; @@ -202,8 +204,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param intermediateStateOutput * @param maxPartitionPerInstance */ - private void validateMaxPartitionsPerInstance(ClusterEvent event, ResourceControllerDataProvider cache, - IntermediateStateOutput intermediateStateOutput, int maxPartitionPerInstance) { + private void validateMaxPartitionsPerInstance(ClusterEvent event, + ResourceControllerDataProvider cache, IntermediateStateOutput intermediateStateOutput, + int maxPartitionPerInstance) { Map<String, PartitionStateMap> resourceStatesMap = intermediateStateOutput.getResourceStatesMap(); Map<String, Integer> instancePartitionCounts = new HashMap<>(); @@ -232,8 +235,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { instancePartitionCounts.put(instance, 0); } int partitionCount = instancePartitionCounts.get(instance); // Number of replicas (from - // different partitions) held - // in this instance + // different partitions) held + // in this instance partitionCount++; if (partitionCount > maxPartitionPerInstance) { HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); @@ -346,19 +349,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } if (!partitionsNeedRecovery.isEmpty()) { - LogUtil.logInfo(logger, _eventId, String - .format("Recovery balance needed for %s partitions: %s", resourceName, - partitionsNeedRecovery)); + LogUtil.logInfo(logger, _eventId, String.format( + "Recovery balance needed for %s partitions: %s", resourceName, partitionsNeedRecovery)); } if (!partitionsNeedLoadBalance.isEmpty()) { - LogUtil.logInfo(logger, _eventId, String - .format("Load balance needed for %s partitions: %s", resourceName, - partitionsNeedLoadBalance)); + LogUtil.logInfo(logger, _eventId, String.format("Load balance needed for %s partitions: %s", + resourceName, partitionsNeedLoadBalance)); } if (!partitionsWithErrorStateReplica.isEmpty()) { - LogUtil.logInfo(logger, _eventId, String - .format("Partition currently has an ERROR replica in %s partitions: %s", resourceName, - partitionsWithErrorStateReplica)); + LogUtil.logInfo(logger, _eventId, + String.format("Partition currently has an ERROR replica in %s partitions: %s", + resourceName, partitionsWithErrorStateReplica)); } chargePendingTransition(resource, currentStateOutput, throttleController, @@ -383,7 +384,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { // ErrorOrRecovery is set threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance(); partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold is - // set + // set } else { if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) { // 0 is the default value so the old threshold has been set @@ -413,8 +414,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { intermediatePartitionStateMap); } - LogUtil - .logDebug(logger, _eventId, String.format("End processing resource: %s", resourceName)); + LogUtil.logDebug(logger, _eventId, String.format("End processing resource: %s", resourceName)); return intermediatePartitionStateMap; } @@ -530,17 +530,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { currentStateOutput.getCurrentStateMap(resourceName); List<Partition> partitionsNeedRecoveryPrioritized = new ArrayList<>(partitionsNeedRecovery); - // TODO: Remove this sort by partition name when Java 1.8 is used // We want the result of the intermediate state calculation to be deterministic. We sort here by // partition name to ensure that the order is consistent for inputs fed into // PartitionPriorityComparator sort - Collections.sort(partitionsNeedRecoveryPrioritized, new Comparator<Partition>() { - @Override - public int compare(Partition partition1, Partition partition2) { - return partition1.getPartitionName().compareTo(partition2.getPartitionName()); - } - }); - Collections.sort(partitionsNeedRecoveryPrioritized, new PartitionPriorityComparator( + partitionsNeedRecoveryPrioritized.sort(Comparator.comparing(Partition::getPartitionName)); + partitionsNeedRecoveryPrioritized.sort(new PartitionPriorityComparator( bestPossiblePartitionStateMap.getStateMap(), currentStateMap, topState, true)); // For each partition, apply throttling if needed. @@ -585,17 +579,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { List<Partition> partitionsNeedLoadRebalancePrioritized = new ArrayList<>(partitionsNeedLoadbalance); - // TODO: Remove this sort by partition name when Java 1.8 is used // We want the result of the intermediate state calculation to be deterministic. We sort here by // partition name to ensure that the order is consistent for inputs fed into // PartitionPriorityComparator sort - Collections.sort(partitionsNeedLoadRebalancePrioritized, new Comparator<Partition>() { - @Override - public int compare(Partition partition1, Partition partition2) { - return partition1.getPartitionName().compareTo(partition2.getPartitionName()); - } - }); - Collections.sort(partitionsNeedLoadRebalancePrioritized, new PartitionPriorityComparator( + partitionsNeedLoadRebalancePrioritized.sort(Comparator.comparing(Partition::getPartitionName)); + partitionsNeedLoadRebalancePrioritized.sort(new PartitionPriorityComparator( bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", false)); for (Partition partition : partitionsNeedLoadRebalancePrioritized) { @@ -618,10 +606,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled, intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache); } - LogUtil.logInfo(logger, _eventId, String.format( - "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing" - + " load-balance but throttled (not load-balanced): %d", - resourceName, partitionsNeedLoadbalance.size(), partitionsLoadbalanceThrottled.size())); + LogUtil.logInfo(logger, _eventId, + String.format( + "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing" + + " load-balance but throttled (not load-balanced): %d", + resourceName, partitionsNeedLoadbalance.size(), partitionsLoadbalanceThrottled.size())); return partitionsLoadbalanceThrottled; } @@ -656,9 +645,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) { hasReachedThrottlingLimit = true; if (logger.isDebugEnabled()) { - LogUtil.logDebug(logger, _eventId, String - .format("Throttled on partition: %s in resource: %s", partition.getPartitionName(), - resourceName)); + LogUtil.logDebug(logger, _eventId, + String.format("Throttled on partition: %s in resource: %s", + partition.getPartitionName(), resourceName)); } } else { // throttle if any of the instances are not able to accept state transitions @@ -671,8 +660,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) { hasReachedThrottlingLimit = true; if (logger.isDebugEnabled()) { - LogUtil.logDebug(logger, _eventId, String - .format("Throttled because of instance: %s for partition: %s in resource: %s", + LogUtil.logDebug(logger, _eventId, + String.format( + "Throttled because of instance: %s for partition: %s in resource: %s", instance, partition.getPartitionName(), resourceName)); } break; @@ -782,8 +772,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { return RebalanceType.NONE; // No further action required } else { return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order to - // achieve BestPossibleState, load balance may be required - // to shift replicas around + // achieve BestPossibleState, load balance may be required + // to shift replicas around } } @@ -806,25 +796,25 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { PartitionStateMap intermediateStateMap) { if (logger.isDebugEnabled()) { - LogUtil.logDebug(logger, _eventId, String - .format("Partitions need recovery: %s\nPartitions get throttled on recovery: %s", + LogUtil.logDebug(logger, _eventId, + String.format("Partitions need recovery: %s\nPartitions get throttled on recovery: %s", recoveryPartitions, recoveryThrottledPartitions)); - LogUtil.logDebug(logger, _eventId, String - .format("Partitions need loadbalance: %s\nPartitions get throttled on load-balance: %s", + LogUtil.logDebug(logger, _eventId, + String.format( + "Partitions need loadbalance: %s\nPartitions get throttled on load-balance: %s", loadbalancePartitions, loadbalanceThrottledPartitions)); } for (Partition partition : allPartitions) { if (logger.isDebugEnabled()) { - LogUtil.logDebug(logger, _eventId, String - .format("%s : Best possible map: %s", partition, - bestPossibleStateMap.getPartitionMap(partition))); + LogUtil.logDebug(logger, _eventId, String.format("%s : Best possible map: %s", partition, + bestPossibleStateMap.getPartitionMap(partition))); LogUtil.logDebug(logger, _eventId, String.format("%s : Current State: %s", partition, currentStateOutput.getCurrentStateMap(resource, partition))); LogUtil.logDebug(logger, _eventId, String.format("%s: Pending state: %s", partition, currentStateOutput.getPendingMessageMap(resource, partition))); - LogUtil.logDebug(logger, _eventId, String - .format("%s: Intermediate state: %s", partition, intermediateStateMap.getPartitionMap(partition))); + LogUtil.logDebug(logger, _eventId, String.format("%s: Intermediate state: %s", partition, + intermediateStateMap.getPartitionMap(partition))); } } }
