Support event differentiated logging Existing task pipeline log are combined with regular pipeline logs which make debugging hard. To have a event tied logs can benefit for future debugging purposes.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/092b3f10 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/092b3f10 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/092b3f10 Branch: refs/heads/master Commit: 092b3f1097bf7d3a97d2308db0e9da510d1bcb4c Parents: 68fe74e Author: Junkai Xue <j...@linkedin.com> Authored: Thu Jul 12 17:01:27 2018 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Thu Jul 12 17:01:27 2018 -0700 ---------------------------------------------------------------------- .../helix/common/caches/AbstractDataCache.java | 10 +++ .../helix/common/caches/CurrentStateCache.java | 16 +++-- .../helix/common/caches/IdealStateCache.java | 3 +- .../controller/GenericHelixController.java | 36 ++++++---- .../org/apache/helix/controller/LogUtil.java | 37 ++++++++++ .../controller/pipeline/AbstractBaseStage.java | 3 + .../helix/controller/pipeline/Pipeline.java | 6 +- .../stages/BestPossibleStateCalcStage.java | 36 +++++----- .../helix/controller/stages/ClusterEvent.java | 23 ++++++- .../stages/CompatibilityCheckStage.java | 3 +- .../stages/CurrentStateComputationStage.java | 7 +- .../stages/ExternalViewComputeStage.java | 21 +++--- .../stages/IntermediateStateCalcStage.java | 71 +++++++++++--------- .../stages/MessageGenerationPhase.java | 44 ++++++------ .../stages/MessageSelectionStage.java | 6 +- .../controller/stages/MessageThrottleStage.java | 15 +++-- .../stages/PersistAssignmentStage.java | 6 +- .../controller/stages/ReadClusterDataStage.java | 6 +- .../stages/ResourceComputationStage.java | 9 ++- .../stages/ResourceValidationStage.java | 10 ++- .../controller/stages/TaskAssignmentStage.java | 16 +++-- 21 files changed, 259 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java index 56b4aa4..1c5924e 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; @@ -31,7 +32,15 @@ import org.slf4j.LoggerFactory; public abstract class AbstractDataCache { private static Logger LOG = LoggerFactory.getLogger(AbstractDataCache.class.getName()); + private String _eventId = "NO_ID"; + public String getEventId() { + return _eventId; + } + + public void setEventId(String eventId) { + _eventId = eventId; + } /** * Selectively fetch Helix Properties from ZK by comparing the version of local cached one with the one on ZK. @@ -54,6 +63,7 @@ public abstract class AbstractDataCache { HelixProperty.Stat stat = stats.get(i); if (stat != null) { T property = cachedPropertyMap.get(key); + if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) { refreshedPropertyMap.put(key, property); } else { http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java index 4c6ecb8..9b1fb99 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; +import org.apache.helix.controller.LogUtil; import org.apache.helix.model.CurrentState; import org.apache.helix.model.LiveInstance; import org.slf4j.Logger; @@ -93,10 +94,12 @@ public class CurrentStateCache extends AbstractDataCache { _currentStateMap = Collections.unmodifiableMap(allCurStateMap); long endTime = System.currentTimeMillis(); - LOG.info("END: CurrentStateCache.refresh() for cluster " + _clusterName + ", took " + (endTime - - startTime) + " ms"); + LogUtil.logInfo(LOG, getEventId(), + "END: CurrentStateCache.refresh() for cluster " + _clusterName + ", took " + (endTime + - startTime) + " ms"); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Current State freshed : ", _currentStateMap.toString())); + LogUtil.logDebug(LOG, getEventId(), + String.format("Current State freshed : ", _currentStateMap.toString())); } return true; } @@ -129,9 +132,10 @@ public class CurrentStateCache extends AbstractDataCache { _currentStateCache)); if (LOG.isDebugEnabled()) { - LOG.debug("# of CurrentStates reload: " + reloadKeys.size() + ", skipped:" + ( - currentStateKeys.size() - reloadKeys.size()) + ". took " + (System.currentTimeMillis() - - start) + " ms to reload new current states for cluster: " + _clusterName); + LogUtil.logDebug(LOG, getEventId(), + "# of CurrentStates reload: " + reloadKeys.size() + ", skipped:" + ( + currentStateKeys.size() - reloadKeys.size()) + ". took " + (System.currentTimeMillis() + - start) + " ms to reload new current states for cluster: " + _clusterName); } } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java index 12056e4..99de0bc 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; +import org.apache.helix.controller.LogUtil; import org.apache.helix.model.IdealState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +93,7 @@ public class IdealStateCache extends AbstractDataCache { _idealStateMap = new HashMap<>(newIdealStateMap); long endTime = System.currentTimeMillis(); - LOG.info( + LogUtil.logInfo(LOG, getEventId(), "Refresh " + _idealStateMap.size() + " idealStates for cluster " + _clusterName + ", took " + (endTime - startTime) + " ms"); } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 474d621..8d1e44b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -406,21 +407,23 @@ public class GenericHelixController implements IdealStateChangeListener, } // add the cache + _cache.setEventId(event.getEventId()); event.addAttribute(AttributeName.ClusterDataCache.name(), cache); List<Pipeline> pipelines = cache.isTaskCache() ? - _taskRegistry.getPipelinesForEvent(event.getEventType()) : - _registry.getPipelinesForEvent(event.getEventType()); + _taskRegistry.getPipelinesForEvent(event.getEventType()) : _registry + .getPipelinesForEvent(event.getEventType()); if (pipelines == null || pipelines.size() == 0) { - logger.info( - "No " + getPipelineType(cache.isTaskCache()) + " pipeline to run for event:" + event - .getEventType()); + logger.info(String + .format("No % pipeline to run for event: %s %s", getPipelineType(cache.isTaskCache()), + event.getEventType(), event.getEventId())); return; } - logger.info(String.format("START: Invoking %s controller pipeline for cluster %s event: %s", - manager.getClusterName(), getPipelineType(cache.isTaskCache()), event.getEventType())); + logger.info(String.format("START: Invoking %s controller pipeline for cluster %s event: %s %s", + manager.getClusterName(), getPipelineType(cache.isTaskCache()), event.getEventType(), + event.getEventId())); long startTime = System.currentTimeMillis(); boolean rebalanceFail = false; for (Pipeline pipeline : pipelines) { @@ -463,9 +466,9 @@ public class GenericHelixController implements IdealStateChangeListener, } long endTime = System.currentTimeMillis(); logger.info(String - .format("END: Invoking %s controller pipeline for event: %s for cluster %s, took %d ms", - getPipelineType(cache.isTaskCache()), event.getEventType(), manager.getClusterName(), - (endTime - startTime))); + .format("END: Invoking %s controller pipeline for event: %s %s for cluster %s, took %d ms", + getPipelineType(cache.isTaskCache()), event.getEventType(), event.getEventId(), + manager.getClusterName(), (endTime - startTime))); if (!cache.isTaskCache()) { // report event process durations @@ -685,7 +688,10 @@ public class GenericHelixController implements IdealStateChangeListener, private void pushToEventQueues(ClusterEventType eventType, NotificationContext changeContext, Map<String, Object> eventAttributes) { - ClusterEvent event = new ClusterEvent(_clusterName, eventType); + // No need for completed UUID, prefixed should be fine + String uid = UUID.randomUUID().toString().substring(0, 8); + ClusterEvent event = new ClusterEvent(_clusterName, eventType, + String.format("%s_%s", uid, PipelineTypes.DEFAULT.name())); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); @@ -693,7 +699,7 @@ public class GenericHelixController implements IdealStateChangeListener, event.addAttribute(attr.getKey(), attr.getValue()); } _eventQueue.put(event); - _taskEventQueue.put(event.clone()); + _taskEventQueue.put(event.clone(String.format("%s_%s", uid, PipelineTypes.TASK.name()))); } @Override @@ -887,13 +893,15 @@ public class GenericHelixController implements IdealStateChangeListener, if (statusFlag) { statusFlag = false; logger.info("controller is now resumed from paused state"); - ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Resume); + String uid = UUID.randomUUID().toString().substring(0, 8); + ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Resume, + String.format("%s_%s", uid, PipelineTypes.DEFAULT.name())); event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.eventData.name(), signal); event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); _eventQueue.put(event); - _taskEventQueue.put(event.clone()); + _taskEventQueue.put(event.clone(String.format("%s_%s", uid, PipelineTypes.TASK.name()))); } } return statusFlag; http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/LogUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/LogUtil.java b/helix-core/src/main/java/org/apache/helix/controller/LogUtil.java new file mode 100644 index 0000000..4d3ae05 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/LogUtil.java @@ -0,0 +1,37 @@ +package org.apache.helix.controller; + +import org.slf4j.Logger; + +public class LogUtil { + public static void logInfo(Logger logger, String eventInfo, String message) { + logger.info(String.format("Event %s : %s", eventInfo, message)); + } + + public static void logWarn(Logger logger, String eventInfo, String message) { + logger.warn(String.format("Event %s : %s", eventInfo, message)); + } + + public static void logError(Logger logger, String eventInfo, String message) { + logger.error(String.format("Event %s %s", eventInfo, message)); + } + + public static void logDebug(Logger logger, String eventInfo, String message) { + logger.debug(String.format("Event %s %s", eventInfo, message)); + } + + public static void logInfo(Logger logger, String eventInfo, String message, Exception e) { + logger.info(String.format("Event %s : %s", eventInfo, message), e); + } + + public static void logWarn(Logger logger, String eventInfo, String message, Exception e) { + logger.warn(String.format("Event %s : %s", eventInfo, message), e); + } + + public static void logError(Logger logger, String eventInfo, String message, Exception e) { + logger.error(String.format("Event %s %s", eventInfo, message), e); + } + + public static void logDebug(Logger logger, String eventInfo, String message, Exception e) { + logger.debug(String.format("Event %s %s", eventInfo, message), e); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java index 324ed02..37259a8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java @@ -20,6 +20,7 @@ package org.apache.helix.controller.pipeline; */ import java.util.Map; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import org.apache.helix.common.DedupEventProcessor; @@ -27,6 +28,8 @@ import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; public class AbstractBaseStage implements Stage { + protected String _eventId; + @Override public void init(StageContext context) { http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java index 2946129..7f00759 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java @@ -65,9 +65,9 @@ public class Pipeline { long endTime = System.currentTimeMillis(); long duration = endTime - startTime; - logger.info(String - .format("END %s for %s pipeline for cluster %s. took: %d ms ", stage.getStageName(), - _pipelineType, event.getClusterName(), duration)); + logger.info(String.format("END %s for %s pipeline for cluster %s. took: %d ms for event %s", + stage.getStageName(), _pipelineType, event.getClusterName(), duration, + event.getEventId())); ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index a465f05..a6ba5b8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -20,6 +20,7 @@ package org.apache.helix.controller.stages; */ import org.apache.helix.HelixManager; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.controller.rebalancer.AutoRebalancer; @@ -49,6 +50,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); final Map<String, Resource> resourceMap = @@ -78,7 +80,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { final Map<String, InstanceConfig> instanceConfigMap = cache.getInstanceConfigMap(); final Map<String, StateModelDefinition> stateModelDefMap = cache.getStateModelDefMap(); asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() { - @Override public Object call() { + @Override + public Object call() { try { if (clusterStatusMonitor != null) { clusterStatusMonitor @@ -86,7 +89,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { resourceMap, stateModelDefMap); } } catch (Exception e) { - logger.error("Could not update cluster status metrics!", e); + LogUtil + .logError(logger, _eventId, "Could not update cluster status metrics!", e); } return null; } @@ -116,7 +120,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { Resource resource = itr.next().getResource(); if (!computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output)) { failureResources.add(resource.getResourceName()); - logger.warn("Failed to calculate best possible states for " + resource.getResourceName()); + LogUtil.logWarn(logger, _eventId, "Failed to calculate best possible states for " + resource.getResourceName()); } } @@ -150,7 +154,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { clusterStatusMonitor.setRebalanceFailureGauge(hasFailure); } } catch (Exception e) { - logger.error("Could not update cluster status!", e); + LogUtil.logError(logger, _eventId, "Could not update cluster status!", e); } return null; } @@ -175,7 +179,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { .enableMaintenanceMode(manager.getClusterName(), true, errMsg); } } else { - logger.error("Failed to pause cluster, HelixManager is not set!"); + LogUtil.logError(logger, _eventId, "Failed to pause cluster, HelixManager is not set!"); } if (!cache.isTaskCache()) { updateRebalanceStatus(true, manager, cache, clusterStatusMonitor, errMsg); @@ -193,13 +197,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { // for each instanceName check if its alive then assign a state String resourceName = resource.getResourceName(); - logger.debug("Processing resource:" + resourceName); + LogUtil.logDebug(logger, _eventId, "Processing resource:" + resourceName); // Ideal state may be gone. In that case we need to get the state model name // from the current state IdealState idealState = cache.getIdealState(resourceName); if (idealState == null) { // if ideal state is deleted, use an empty one - logger.info("resource:" + resourceName + " does not exist anymore"); + LogUtil.logInfo(logger, _eventId, "resource:" + resourceName + " does not exist anymore"); idealState = new IdealState(resourceName); idealState.setStateModelDefRef(resource.getStateModelDefRef()); } @@ -209,7 +213,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { MappingCalculator mappingCalculator = getMappingCalculator(rebalancer, resourceName); if (rebalancer == null || mappingCalculator == null) { - logger.error( + LogUtil.logError(logger, _eventId, "Error computing assignment for resource " + resourceName + ". no rebalancer found. rebalancer: " + rebalancer + " mappingCaculator: " + mappingCalculator); } @@ -241,7 +245,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { // Check if calculation is done successfully return checkBestPossibleStateCalculation(idealState); } catch (Exception e) { - logger.error("Error computing assignment for resource " + resourceName + ". Skipping.", e); + LogUtil + .logError(logger, _eventId, "Error computing assignment for resource " + resourceName + ". Skipping.", e); // TODO : remove this part after debugging NPE StringBuilder sb = new StringBuilder(); @@ -254,7 +259,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { String.format("PartitionAssignment is null : %s\n", partitionStateAssignment == null)); sb.append(String.format("Output is null : %s\n", output == null)); - logger.error(sb.toString()); + LogUtil.logError(logger, _eventId, sb.toString()); } } // Exception or rebalancer is not found @@ -289,14 +294,15 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { String rebalancerClassName = idealState.getRebalancerClassName(); if (rebalancerClassName != null) { if (logger.isDebugEnabled()) { - logger - .debug("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName); + LogUtil.logDebug(logger, _eventId, + "resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName); } try { customizedRebalancer = Rebalancer.class .cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance()); } catch (Exception e) { - logger.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e); + LogUtil.logError(logger, _eventId, + "Exception while invoking custom rebalancer class:" + rebalancerClassName, e); } } @@ -324,7 +330,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { rebalancer = customizedRebalancer; break; default: - logger.error( + LogUtil.logError(logger, _eventId, "Fail to find the rebalancer, invalid rebalance mode " + idealState.getRebalanceMode()); break; } @@ -339,7 +345,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { try { mappingCalculator = MappingCalculator.class.cast(rebalancer); } catch (ClassCastException e) { - logger.warn( + LogUtil.logWarn(logger, _eventId, "Rebalancer does not have a mapping calculator, defaulting to SEMI_AUTO, resource: " + resourceName); } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java index 26f4f17..e82b0e4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java @@ -22,6 +22,7 @@ package org.apache.helix.controller.stages; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,19 +32,27 @@ public class ClusterEvent { private final Map<String, Object> _eventAttributeMap; private long _creationTime; private String _clusterName; + private String _eventId; @Deprecated public ClusterEvent(ClusterEventType eventType) { _eventType = eventType; _eventAttributeMap = new HashMap<>(); _creationTime = System.currentTimeMillis(); + _eventId = UUID.randomUUID().toString(); } public ClusterEvent(String clusterName, ClusterEventType eventType) { + this(clusterName, eventType, UUID.randomUUID().toString()); + } + + public ClusterEvent(String clusterName, ClusterEventType eventType, String eventId) { _clusterName = clusterName; _eventType = eventType; + _eventAttributeMap = new HashMap<>(); _creationTime = System.currentTimeMillis(); + _eventId = eventId; } public void addAttribute(String attrName, Object attrValue) { @@ -73,6 +82,15 @@ public class ClusterEvent { _clusterName = clusterName; } + public void setEventId(String eventId) { + _eventId = eventId; + } + + public String getEventId() { + return _eventId; + } + + @SuppressWarnings("unchecked") public <T extends Object> T getAttribute(String attrName) { Object ret = _eventAttributeMap.get(attrName); @@ -85,6 +103,7 @@ public class ClusterEvent { @Override public String toString() { StringBuilder sb = new StringBuilder(); + sb.append(String.format("Event id : %s", _eventId.toString())); sb.append("name:" + _eventType.name()).append("\n"); for (String key : _eventAttributeMap.keySet()) { sb.append(key).append(":").append(_eventAttributeMap.get(key)).append("\n"); @@ -92,8 +111,8 @@ public class ClusterEvent { return sb.toString(); } - public ClusterEvent clone() { - ClusterEvent newEvent = new ClusterEvent(_clusterName, _eventType); + public ClusterEvent clone(String eventId) { + ClusterEvent newEvent = new ClusterEvent(_clusterName, _eventType, eventId); newEvent.setCreationTime(_creationTime); for (String attributeName : _eventAttributeMap.keySet()) { newEvent.addAttribute(attributeName, _eventAttributeMap.get(attributeName)); http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java index 28a5a9e..8a0a719 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperties; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.LiveInstance; @@ -56,7 +57,7 @@ public class CompatibilityCheckStage extends AbstractBaseStage { + properties.getProperty("miminum_supported_version.participant") + ", participant: " + liveInstance.getInstanceName() + ", participantVersion: " + participantVersion; - LOG.error(errorMsg); + LogUtil.logError(LOG, event.getEventId(), errorMsg); throw new StageException(errorMsg); } } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 64cd4f4..a56d194 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -24,6 +24,7 @@ import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.*; import org.apache.helix.model.Message.MessageType; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.controller.LogUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); @@ -303,7 +305,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage { // 3. if no clue about previous topstate or any related pending message, use the current system time. if (startTime == NOT_RECORDED) { - LOG.warn("Cannot confirm top state missing start time. Use the current system time as the start time."); + LogUtil.logWarn(LOG, _eventId, + "Cannot confirm top state missing start time. Use the current system time as the start time."); startTime = System.currentTimeMillis(); } @@ -343,7 +346,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage { } if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) { - LOG.info(String.format("Missing topstate duration is %d for partition %s", + LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s", handOffEndTime - handOffStartTime, partition.getPartitionName())); if (clusterStatusMonitor != null) { clusterStatusMonitor http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index 1f455cb..bf7be01 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -22,6 +22,7 @@ package org.apache.helix.controller.stages; import org.apache.helix.*; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecordDelta.MergeOperation; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.controller.pipeline.StageException; @@ -44,6 +45,7 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { @Override public void execute(final ClusterEvent event) throws Exception { + _eventId = event.getEventId(); HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); @@ -158,7 +160,8 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { it.remove(); // remove the external view if the external view exists if (curExtViews.containsKey(resourceName)) { - LOG.info("Remove externalView for resource: " + resourceName); + LogUtil + .logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName); dataAccessor.removeProperty(keyBuilder.externalView(resourceName)); externalviewsToRemove.add(resourceName); } @@ -176,7 +179,7 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { // remove dead external-views for (String resourceName : curExtViews.keySet()) { if (!resourceMap.keySet().contains(resourceName)) { - LOG.info("Remove externalView for resource: " + resourceName); + LogUtil.logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName); dataAccessor.removeProperty(keyBuilder.externalView(resourceName)); externalviewsToRemove.add(resourceName); } @@ -201,19 +204,19 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { for (String taskPartitionName : ev.getPartitionSet()) { for (String taskState : ev.getStateMap(taskPartitionName).values()) { - if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString()) - || taskState.equalsIgnoreCase("COMPLETED")) { - LOG.info(taskPartitionName + " finished as " + taskState); + if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString()) || taskState + .equalsIgnoreCase("COMPLETED")) { + LogUtil.logInfo(LOG, _eventId, taskPartitionName + " finished as " + taskState); finishedTasks.getListFields().put(taskPartitionName, emptyList); finishedTasks.getMapFields().put(taskPartitionName, emptyMap); // Update original scheduler message status update if (taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null) { - String controllerMsgId = - taskQueueIdealState.getRecord().getMapField(taskPartitionName) - .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID); + String controllerMsgId = taskQueueIdealState.getRecord().getMapField(taskPartitionName) + .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID); if (controllerMsgId != null) { - LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId); + LogUtil.logInfo(LOG, _eventId, + taskPartitionName + " finished with controllerMsg " + controllerMsgId); if (!controllerMsgUpdates.containsKey(controllerMsgId)) { controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>()); } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index 0f11ecd..3bede5e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -24,6 +24,7 @@ import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; @@ -44,6 +45,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); BestPossibleStateOutput bestPossibleStateOutput = @@ -132,8 +134,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { IdealState idealState = dataCache.getIdealState(resourceName); if (idealState == null) { // If IdealState is null, use an empty one - logger.info("IdealState for resource {} does not exist; resource may not exist anymore", - resourceName); + LogUtil.logInfo(logger, _eventId, String + .format("IdealState for resource %s does not exist; resource may not exist anymore", + resourceName)); idealState = new IdealState(resourceName); idealState.setStateModelDefRef(resource.getStateModelDefRef()); } @@ -202,7 +205,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { manager.getClusterManagmentTool().enableMaintenanceMode(manager.getClusterName(), true, errMsg); } else { - logger.error( + LogUtil.logError(logger, _eventId, "HelixManager is not set/null! Failed to pause this cluster/enable maintenance" + " mode due to an instance being assigned more replicas/partitions than " + "the limit."); @@ -233,7 +236,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { Map<String, List<String>> preferenceLists, StateTransitionThrottleController throttleController) { String resourceName = resource.getResourceName(); - logger.debug("Processing resource: {}", resourceName); + LogUtil.logDebug(logger, _eventId, String.format("Processing resource: %s", resourceName)); // Throttling is applied only on FULL-AUTO mode if (!throttleController.isThrottleEnabled() @@ -288,16 +291,19 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } if (!partitionsNeedRecovery.isEmpty()) { - logger.info("Recovery balance needed for {} partitions: {}", resourceName, - partitionsNeedRecovery); + LogUtil.logInfo(logger, _eventId, String + .format("Recovery balance needed for %s partitions: %s", resourceName, + partitionsNeedRecovery)); } if (!partitionsNeedLoadBalance.isEmpty()) { - logger.info("Load balance needed for partitions: {}", resourceName, - partitionsNeedLoadBalance); + LogUtil.logInfo(logger, _eventId, String + .format("Load balance needed for %s partitions: %s", resourceName, + partitionsNeedLoadBalance)); } if (!partitionsWithErrorStateReplica.isEmpty()) { - logger.info("Partition currently has an ERROR replica in {} partitions: {}", resourceName, - partitionsWithErrorStateReplica); + LogUtil.logInfo(logger, _eventId, String + .format("Partition currently has an ERROR replica in %s partitions: %s", resourceName, + partitionsWithErrorStateReplica)); } chargePendingTransition(resource, currentStateOutput, throttleController, @@ -352,7 +358,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { intermediatePartitionStateMap); } - logger.debug("End processing resource: {}", resourceName); + LogUtil + .logDebug(logger, _eventId, String.format("End processing resource: %s", resourceName)); return intermediatePartitionStateMap; } @@ -474,7 +481,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { currentStateOutput, bestPossiblePartitionStateMap, partitionRecoveryBalanceThrottled, intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE); } - logger.info(String.format( + LogUtil.logInfo(logger, _eventId, String.format( "For resource %s: Num of partitions needing recovery: %d, Num of partitions needing recovery" + " but throttled (not recovered): %d", resourceName, partitionsNeedRecovery.size(), partitionRecoveryBalanceThrottled.size())); @@ -542,7 +549,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled, intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE); } - logger.info(String.format( + LogUtil.logInfo(logger, _eventId, String.format( "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing" + " load-balance but throttled (not load-balanced): %d", resourceName, partitionsNeedLoadbalance.size(), partitionsLoadbalanceThrottled.size())); @@ -578,8 +585,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) { hasReachedThrottlingLimit = true; if (logger.isDebugEnabled()) { - logger.debug("Throttled on partition: {} in resource: {}", partition.getPartitionName(), - resourceName); + LogUtil.logDebug(logger, _eventId, String + .format("Throttled on partition: %s in resource: %s", partition.getPartitionName(), + resourceName)); } } else { // throttle if any of the instances are not able to accept state transitions @@ -590,9 +598,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) { hasReachedThrottlingLimit = true; if (logger.isDebugEnabled()) { - logger.debug( - "Throttled because of instance: {} for partition: {} in resource: {}", instance, - partition.getPartitionName(), resourceName); + LogUtil.logDebug(logger, _eventId, String + .format("Throttled because of instance: %s for partition: %s in resource: %s", + instance, partition.getPartitionName(), resourceName)); } break; } @@ -698,22 +706,25 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { PartitionStateMap intermediateStateMap) { if (logger.isDebugEnabled()) { - logger.debug("Partitions need recovery: {}\nPartitions get throttled on recovery: {}", - recoveryPartitions, recoveryThrottledPartitions); - logger.debug("Partitions need loadbalance: {}\nPartitions get throttled on load-balance: {}", - loadbalancePartitions, loadbalanceThrottledPartitions); + LogUtil.logDebug(logger, _eventId, String + .format("Partitions need recovery: %s\nPartitions get throttled on recovery: %s", + recoveryPartitions, recoveryThrottledPartitions)); + LogUtil.logDebug(logger, _eventId, String + .format("Partitions need loadbalance: %s\nPartitions get throttled on load-balance: %s", + loadbalancePartitions, loadbalanceThrottledPartitions)); } for (Partition partition : allPartitions) { if (logger.isDebugEnabled()) { - logger.debug(partition + ": Best possible map: {}", - bestPossibleStateMap.getPartitionMap(partition)); - logger.debug(partition + ": Current State: {}", - currentStateOutput.getCurrentStateMap(resource, partition)); - logger.debug(partition + ": Pending state: {}", - currentStateOutput.getPendingMessageMap(resource, partition)); - logger.debug(partition + ": Intermediate state: {}", - intermediateStateMap.getPartitionMap(partition)); + LogUtil.logDebug(logger, _eventId, String + .format("%s : Best possible map: %s", partition, + bestPossibleStateMap.getPartitionMap(partition))); + LogUtil.logDebug(logger, _eventId, String.format("%s : Current State: %s", partition, + currentStateOutput.getCurrentStateMap(resource, partition))); + LogUtil.logDebug(logger, _eventId, String.format("%s: Pending state: %s", partition, + currentStateOutput.getPendingMessageMap(resource, partition))); + LogUtil.logDebug(logger, _eventId, String + .format("%s: Intermediate state: %s", partition, intermediateStateMap.getPartitionMap(partition))); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 5a4f039..e21c607 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -30,6 +30,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.SystemPropertyKeys; import org.apache.helix.api.config.StateTransitionTimeoutConfig; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; @@ -65,6 +66,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); @@ -92,7 +94,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef()); if (stateModelDef == null) { - logger.error( + LogUtil.logError(logger, _eventId, "State Model Definition null, skip generating messages for resource: " + resourceName); continue; } @@ -137,10 +139,10 @@ public class MessageGenerationPhase extends AbstractBaseStage { if (shouldCleanUpPendingMessage(pendingMessage, currentState, currentStateOutput.getEndTime(resourceName, partition, instanceName))) { - logger.info( - "Adding pending message {} on instance {} to clean up. Msg: {}->{}, current state of resource {}:{} is {}", + LogUtil.logInfo(logger, _eventId, String.format( + "Adding pending message %s on instance %s to clean up. Msg: %s->%s, current state of resource %s:%s is %s", pendingMessage.getMsgId(), instanceName, pendingMessage.getFromState(), - pendingMessage.getToState(), resourceName, partition, currentState); + pendingMessage.getToState(), resourceName, partition, currentState)); if (!pendingMessagesToCleanUp.containsKey(instanceName)) { pendingMessagesToCleanUp.put(instanceName, new HashMap<String, Message>()); } @@ -158,29 +160,31 @@ public class MessageGenerationPhase extends AbstractBaseStage { } } else { if (nextState == null) { - logger.error("Unable to find a next state for resource: " + resource.getResourceName() - + " partition: " + partition.getPartitionName() + " from stateModelDefinition" - + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState); + LogUtil.logError(logger, _eventId, + "Unable to find a next state for resource: " + resource.getResourceName() + + " partition: " + partition.getPartitionName() + " from stateModelDefinition" + + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState); continue; } if (pendingMessage != null) { String pendingState = pendingMessage.getToState(); if (nextState.equalsIgnoreCase(pendingState)) { - logger.debug( + LogUtil.logDebug(logger, _eventId, "Message already exists for " + instanceName + " to transit " + resource .getResourceName() + "." + partition.getPartitionName() + " from " + currentState + " to " + nextState); } else if (currentState.equalsIgnoreCase(pendingState)) { - logger.info( + LogUtil.logInfo(logger, _eventId, "Message hasn't been removed for " + instanceName + " to transit " + resource .getResourceName() + "." + partition.getPartitionName() + " to " + pendingState + ", desiredState: " + desiredState); } else { - logger.info("IdealState changed before state transition completes for " + resource - .getResourceName() + "." + partition.getPartitionName() + " on " + instanceName - + ", pendingState: " + pendingState + ", currentState: " + currentState - + ", nextState: " + nextState); + LogUtil.logInfo(logger, _eventId, + "IdealState changed before state transition completes for " + resource + .getResourceName() + "." + partition.getPartitionName() + " on " + + instanceName + ", pendingState: " + pendingState + ", currentState: " + + currentState + ", nextState: " + nextState); message = createStateTransitionCancellationMessage(manager, resource, partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName), @@ -195,7 +199,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { stateModelDef.getId()); if (logger.isDebugEnabled()) { - logger.debug(String.format( + LogUtil.logDebug(logger, _eventId, String.format( "Resource %s partition %s for instance %s with currentState %s and nextState %s", resource, partition.getPartitionName(), instanceName, currentState, nextState)); } @@ -268,7 +272,8 @@ public class MessageGenerationPhase extends AbstractBaseStage { String instanceName = entry.getKey(); for (Message msg : entry.getValue().values()) { if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName))) { - logger.info("Deleted message {} from instance {}", msg.getMsgId(), instanceName); + LogUtil.logInfo(logger, _eventId, String + .format("Deleted message %s from instance %s", msg.getMsgId(), instanceName)); } } } @@ -330,9 +335,10 @@ public class MessageGenerationPhase extends AbstractBaseStage { String currentState) { if (isCancellationEnabled && cancellationMessage == null) { - logger.info("Send cancellation message of the state transition for " + resource.getResourceName() + "." - + partitionName + " on " + instanceName + ", currentState: " + currentState + ", nextState: " - + (nextState == null ? "N/A" : nextState)); + LogUtil.logInfo(logger, _eventId, + "Send cancellation message of the state transition for " + resource.getResourceName() + + "." + partitionName + " on " + instanceName + ", currentState: " + currentState + + ", nextState: " + (nextState == null ? "N/A" : nextState)); String uuid = UUID.randomUUID().toString(); Message message = new Message(MessageType.STATE_TRANSITION_CANCELLATION, uuid); @@ -382,7 +388,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { try { timeout = Integer.parseInt(timeOutStr); } catch (Exception e) { - logger.error("", e); + LogUtil.logError(logger, _eventId, "", e); } } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java index d683899..a061598 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.IdealState; @@ -57,6 +58,7 @@ public class MessageSelectionStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); CurrentStateOutput currentStateOutput = @@ -200,7 +202,9 @@ public class MessageSelectionStage extends AbstractBaseStage { } } else { // reach upper-bound of message for the topState, will not send the message - LOG.info("Reach upper_bound: " + stateConstraints.get(toState).getUpperBound() + ", not send message: " + message); + LogUtil.logInfo(LOG, _eventId, + "Reach upper_bound: " + stateConstraints.get(toState).getUpperBound() + + ", not send message: " + message); } continue; } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java index e918cd7..c504199 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.ClusterConstraints; @@ -52,16 +53,18 @@ public class MessageThrottleStage extends AbstractBaseStage { value = Integer.MAX_VALUE; break; default: - LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:" - + Integer.MAX_VALUE); + LogUtil.logError(LOG, _eventId, + "Invalid constraintValue token:" + valueStr + ". Use default value:" + + Integer.MAX_VALUE); break; } } catch (Exception e) { try { value = Integer.parseInt(valueStr); } catch (NumberFormatException ne) { - LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:" - + Integer.MAX_VALUE); + LogUtil.logError(LOG, _eventId, + "Invalid constraintValue string:" + valueStr + ". Use default value:" + + Integer.MAX_VALUE); } } return value; @@ -113,6 +116,7 @@ public class MessageThrottleStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); MessageSelectionStageOutput msgSelectionOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); @@ -176,7 +180,8 @@ public class MessageThrottleStage extends AbstractBaseStage { if (LOG.isDebugEnabled()) { // TODO: printout constraint item that throttles the message - LOG.debug("message: " + message + " is throttled by constraint: " + item); + LogUtil.logDebug(LOG, _eventId, + "message: " + message + " is throttled by constraint: " + item); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java index ca83445..f909c9e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java @@ -30,10 +30,8 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; -import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; -import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; @@ -41,6 +39,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.MasterSlaveSMD; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; +import org.apache.helix.controller.LogUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +77,8 @@ public class PersistAssignmentStage extends AbstractAsyncBaseStage { if (resource != null) { final IdealState idealState = cache.getIdealState(resourceId); if (idealState == null) { - LOG.warn("IdealState not found for resource " + resourceId); + LogUtil + .logWarn(LOG, event.getEventId(), "IdealState not found for resource " + resourceId); continue; } IdealState.RebalanceMode mode = idealState.getRebalanceMode(); http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java index 036a913..2a9e4c3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java @@ -26,6 +26,7 @@ import java.util.concurrent.Callable; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.ClusterConfig; @@ -45,6 +46,7 @@ public class ReadClusterDataStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); if (manager == null) { throw new StageException("HelixManager attribute value is null"); @@ -66,7 +68,7 @@ public class ReadClusterDataStage extends AbstractBaseStage { @Override public Object call() { // Update the cluster status gauges if (clusterStatusMonitor != null) { - logger.debug("Update cluster status monitors"); + LogUtil.logDebug(logger, _eventId, "Update cluster status monitors"); Set<String> instanceSet = Sets.newHashSet(); Set<String> liveInstanceSet = Sets.newHashSet(); @@ -97,7 +99,7 @@ public class ReadClusterDataStage extends AbstractBaseStage { clusterStatusMonitor .setClusterInstanceStatus(liveInstanceSet, instanceSet, disabledInstanceSet, disabledPartitions, oldDisabledPartitions, tags); - logger.debug("Complete cluster status monitors update."); + LogUtil.logDebug(logger, _eventId, "Complete cluster status monitors update."); } return null; } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index 6c2e916..02a175a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -31,6 +31,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Resource; import org.apache.helix.task.TaskConstants; +import org.apache.helix.controller.LogUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,7 @@ public class ResourceComputationStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); if (cache == null) { throw new StageException("Missing attributes in event:" + event + ". Requires DataCache"); @@ -142,9 +144,10 @@ public class ResourceComputationStage extends AbstractBaseStage { } if (currentState.getStateModelDefRef() == null) { - LOG.error("state model def is null." + "resource:" + currentState.getResourceName() - + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: " - + currentState.getPartitionStateMap().values()); + LogUtil.logError(LOG, _eventId, + "state model def is null." + "resource:" + currentState.getResourceName() + + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: " + + currentState.getPartitionStateMap().values()); throw new StageException("State model def is null for resource:" + currentState.getResourceName()); } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java index d60efe4..d4e76c6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java @@ -21,6 +21,7 @@ package org.apache.helix.controller.stages; import java.util.Map; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.IdealState; @@ -34,6 +35,7 @@ public class ResourceValidationStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); if (cache == null) { throw new StageException("Missing attributes in event:" + event + ". Requires DataCache"); @@ -60,7 +62,8 @@ public class ResourceValidationStage extends AbstractBaseStage { } } if (!hasMatchingRule) { - LOG.warn("Resource " + resourceName + " does not have a valid ideal state!"); + LogUtil.logWarn(LOG, _eventId, + "Resource " + resourceName + " does not have a valid ideal state!"); resourceMap.remove(resourceName); } } @@ -69,8 +72,9 @@ public class ResourceValidationStage extends AbstractBaseStage { String stateModelDefRef = idealState.getStateModelDefRef(); StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefRef); if (stateModelDef == null) { - LOG.warn("Resource " + resourceName + " uses state model " + stateModelDefRef - + ", but it is not on the cluster!"); + LogUtil.logWarn(LOG, _eventId, + "Resource " + resourceName + " uses state model " + stateModelDefRef + + ", but it is not on the cluster!"); resourceMap.remove(resourceName); } } http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index d8ccd0f..c196a26 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -30,6 +30,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperties; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.LiveInstance; @@ -45,6 +46,7 @@ public class TaskAssignmentStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); @@ -84,7 +86,7 @@ public class TaskAssignmentStage extends AbstractBaseStage { long cacheStart = System.currentTimeMillis(); cache.cacheMessages(outputMessages); long cacheEnd = System.currentTimeMillis(); - logger.debug("Caching messages took " + (cacheEnd - cacheStart) + " ms"); + LogUtil.logDebug(logger, _eventId, "Caching messages took " + (cacheEnd - cacheStart) + " ms"); } List<Message> batchMessage(Builder keyBuilder, List<Message> messages, @@ -139,17 +141,19 @@ public class TaskAssignmentStage extends AbstractBaseStage { List<PropertyKey> keys = new ArrayList<PropertyKey>(); for (Message message : messages) { - logger.info( + LogUtil.logInfo(logger, _eventId, "Sending Message " + message.getMsgId() + " to " + message.getTgtName() + " transit " + message.getResourceName() + "." + message.getPartitionName() + "|" + message .getPartitionNames() + " from:" + message.getFromState() + " to:" + message .getToState() + ", relayMessages: " + message.getRelayMessages().size()); if (message.hasRelayMessages()) { for (Message msg : message.getRelayMessages().values()) { - logger.info("Sending Relay Message " + msg.getMsgId() + " to " + msg.getTgtName() + " transit " - + msg.getResourceName() + "." + msg.getPartitionName() + "|" + msg.getPartitionNames() + " from:" - + msg.getFromState() + " to:" + msg.getToState() + ", relayFrom: " + msg.getRelaySrcHost() - + ", attached to message: " + message.getMsgId()); + LogUtil.logInfo(logger, _eventId, + "Sending Relay Message " + msg.getMsgId() + " to " + msg.getTgtName() + " transit " + + msg.getResourceName() + "." + msg.getPartitionName() + "|" + msg + .getPartitionNames() + " from:" + msg.getFromState() + " to:" + msg.getToState() + + ", relayFrom: " + msg.getRelaySrcHost() + ", attached to message: " + message + .getMsgId()); } }