Repository: helix Updated Branches: refs/heads/master a6863937c -> 53a6791e7
Log improvements Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c3297ae4 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c3297ae4 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c3297ae4 Branch: refs/heads/master Commit: c3297ae435cabac6c650ad649e2b72b48ea16dc7 Parents: a686393 Author: Junkai Xue <[email protected]> Authored: Fri Sep 7 12:15:03 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Oct 29 14:29:21 2018 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 9 ++-- .../controller/stages/ClusterDataCache.java | 32 +++++--------- .../stages/MessageGenerationPhase.java | 44 ++++++++++---------- 3 files changed, 39 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c3297ae4/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 6a8fedd..6b1244b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -199,7 +199,7 @@ public class GenericHelixController implements IdealStateChangeListener, forceRebalance(_manager, _clusterEventType); } catch (Throwable ex) { logger.error("Time task failed. Rebalance task type: " + _clusterEventType + ", cluster: " - + _clusterName); + + _clusterName, ex); } } } @@ -208,7 +208,8 @@ public class GenericHelixController implements IdealStateChangeListener, private void forceRebalance(HelixManager manager, ClusterEventType eventType) { NotificationContext changeContext = new NotificationContext(manager); changeContext.setType(NotificationContext.Type.CALLBACK); - ClusterEvent event = new ClusterEvent(_clusterName, eventType); + String uid = UUID.randomUUID().toString().substring(0, 8); + ClusterEvent event = new ClusterEvent(_clusterName, eventType, uid); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.eventData.name(), new ArrayList<>()); @@ -217,7 +218,9 @@ public class GenericHelixController implements IdealStateChangeListener, _taskEventQueue.put(event); _eventQueue.put(event); - logger.info("Controller rebalance event triggered with event type: " + eventType); + logger.info(String + .format("Controller rebalance event triggered with event type: %s for cluster %s", + eventType, _clusterName)); } // TODO who should stop this timer http://git-wip-us.apache.org/repos/asf/helix/blob/c3297ae4/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 2299f92..e1a374e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -181,8 +181,9 @@ public class ClusterDataCache extends AbstractDataCache { _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, false); clearCachedResourceAssignments(); _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true); - LogUtil.logInfo(LOG, _eventId, "Reload InstanceConfig: " + _instanceConfigCacheMap.keySet() - + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); + LogUtil.logInfo(LOG, _eventId, + "Reload InstanceConfig for cluster " + _clusterName + " : " + _instanceConfigCacheMap + .keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); } if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) { @@ -190,8 +191,9 @@ public class ClusterDataCache extends AbstractDataCache { clearCachedResourceAssignments(); _resourceConfigCacheMap = refreshResourceConfigs(accessor); - LogUtil.logInfo(LOG, _eventId, "Reload ResourceConfigs: " + _resourceConfigCacheMap.keySet() - + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); + LogUtil.logInfo(LOG, _eventId, + "Reload ResourceConfigs for cluster " + _clusterName + " : " + _resourceConfigCacheMap + .keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); } // This is for targeted jobs' task assignment. It needs to watch for current state changes for @@ -230,23 +232,11 @@ public class ClusterDataCache extends AbstractDataCache { AssignableInstanceManager assignableInstanceManager = _taskDataCache.getAssignableInstanceManager(); // Build from scratch every time - assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache, - _liveInstanceMap, _instanceConfigMap); - /** - * TODO: (Hunter) Consider this for optimization after fixing the problem of quotas not being - * properly released for targeted tasks - * if (_existsClusterConfigChange) { - * // Update both flags since buildAssignableInstances includes updateAssignableInstances - * _existsClusterConfigChange = false; - * _existsInstanceChange = false; - * assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache, - * _liveInstanceMap, _instanceConfigMap); - * } else if (_existsInstanceChange) { - * _existsInstanceChange = false; - * assignableInstanceManager.updateAssignableInstances(_clusterConfig, _liveInstanceMap, - * _instanceConfigMap); - * } - **/ + assignableInstanceManager + .buildAssignableInstances(_clusterConfig, _taskDataCache, _liveInstanceMap, + _instanceConfigMap); + // TODO: (Hunter) Consider this for optimization after fixing the problem of quotas not being + assignableInstanceManager.logQuotaProfileJSON(false); } http://git-wip-us.apache.org/repos/asf/helix/blob/c3297ae4/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index cc098f6..ae0c93f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -99,8 +99,8 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { } for (Partition partition : resource.getPartitions()) { - Map<String, String> instanceStateMap = new HashMap<>( - resourcesStateMap.getInstanceStateMap(resourceName, partition)); + Map<String, String> instanceStateMap = + new HashMap<>(resourcesStateMap.getInstanceStateMap(resourceName, partition)); Map<String, String> pendingStateMap = currentStateOutput.getPendingStateMap(resourceName, partition); @@ -114,7 +114,6 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { } } - // we should generate message based on the desired-state priority // so keep generated messages in a temp map keyed by state // desired-state->list of generated-messages @@ -123,17 +122,17 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { for (String instanceName : instanceStateMap.keySet()) { String desiredState = instanceStateMap.get(instanceName); - String currentState = currentStateOutput.getCurrentState(resourceName, partition, - instanceName); + String currentState = + currentStateOutput.getCurrentState(resourceName, partition, instanceName); if (currentState == null) { currentState = stateModelDef.getInitialState(); } - Message pendingMessage = currentStateOutput.getPendingMessage(resourceName, partition, - instanceName); + Message pendingMessage = + currentStateOutput.getPendingMessage(resourceName, partition, instanceName); boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled(); - Message cancellationMessage = currentStateOutput.getCancellationMessage(resourceName, - partition, instanceName); + Message cancellationMessage = + currentStateOutput.getCancellationMessage(resourceName, partition, instanceName); String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState); Message message = null; @@ -151,7 +150,8 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { .put(pendingMessage.getMsgId(), pendingMessage); } - if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) { + if (desiredState.equals(NO_DESIRED_STATE) || desiredState + .equalsIgnoreCase(currentState)) { if (desiredState.equals(NO_DESIRED_STATE) || pendingMessage != null && !currentState .equalsIgnoreCase(pendingMessage.getToState())) { message = createStateTransitionCancellationMessage(manager, resource, @@ -202,7 +202,8 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { if (logger.isDebugEnabled()) { LogUtil.logDebug(logger, _eventId, String.format( "Resource %s partition %s for instance %s with currentState %s and nextState %s", - resource, partition.getPartitionName(), instanceName, currentState, nextState)); + resource.getResourceName(), partition.getPartitionName(), instanceName, + currentState, nextState)); } } } @@ -256,19 +257,18 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { /** * Start a job in worker pool that asynchronously clean up pending message. Since it is possible - * that participant failed to clean up message after processing, it is important for controller - * to try to clean them up as well to unblock further rebalance + * that participant failed to clean up message after processing, it is important for controller to + * try to clean them up as well to unblock further rebalance * * @param pendingMessagesToPurge key: instance name, value: list of pending message to cleanup - * @param workerPool ExecutorService that job can be submitted to - * @param accessor Data accessor used to clean up message + * @param workerPool ExecutorService that job can be submitted to + * @param accessor Data accessor used to clean up message */ private void schedulePendingMessageCleanUp( final Map<String, Map<String, Message>> pendingMessagesToPurge, ExecutorService workerPool, final HelixDataAccessor accessor) { workerPool.submit(new Callable<Object>() { - @Override - public Object call() { + @Override public Object call() { for (Map.Entry<String, Map<String, Message>> entry : pendingMessagesToPurge.entrySet()) { String instanceName = entry.getKey(); for (Message msg : entry.getValue().values()) { @@ -302,9 +302,9 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { } } - private Message createStateTransitionMessage(HelixManager manager, Resource resource, String partitionName, - String instanceName, String currentState, String nextState, String sessionId, - String stateModelDefName) { + private Message createStateTransitionMessage(HelixManager manager, Resource resource, + String partitionName, String instanceName, String currentState, String nextState, + String sessionId, String stateModelDefName) { String uuid = UUID.randomUUID().toString(); Message message = new Message(MessageType.STATE_TRANSITION, uuid); message.setSrcName(manager.getInstanceName()); @@ -332,8 +332,8 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { private Message createStateTransitionCancellationMessage(HelixManager manager, Resource resource, String partitionName, String instanceName, String sessionId, String stateModelDefName, - String fromState, String toState, String nextState, Message cancellationMessage, boolean isCancellationEnabled, - String currentState) { + String fromState, String toState, String nextState, Message cancellationMessage, + boolean isCancellationEnabled, String currentState) { if (isCancellationEnabled && cancellationMessage == null) { LogUtil.logInfo(logger, _eventId,
