Add Pipeline types to differentiate logs
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c7b250a8 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c7b250a8 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c7b250a8 Branch: refs/heads/master Commit: c7b250a891ada90d98e1828425b2caf1cb60b956 Parents: 7571d22 Author: Junkai Xue <j...@linkedin.com> Authored: Mon Aug 28 14:50:55 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:07:07 2017 -0800 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 42 ++++++++++++++------ .../helix/controller/pipeline/Pipeline.java | 19 ++++++--- .../stages/BestPossibleStateCalcStage.java | 6 ++- .../controller/stages/ClusterDataCache.java | 2 +- .../stages/CurrentStateComputationStage.java | 4 +- .../stages/ExternalViewComputeStage.java | 6 ++- .../stages/IntermediateStateCalcStage.java | 7 ++-- .../controller/stages/ReadClusterDataStage.java | 4 +- .../stages/ResourceComputationStage.java | 6 ++- .../controller/stages/TaskAssignmentStage.java | 7 ++-- 10 files changed, 70 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index c0886ba..e89d159 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -137,17 +137,24 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC private String _clusterName; + enum PipelineTypes { + DEFAULT, + TASK + } + /** * Default constructor that creates a default pipeline registry. This is sufficient in most cases, * but if there is a some thing specific needed use another constructor where in you can pass a * pipeline registry */ public GenericHelixController() { - this(createDefaultRegistry(), createDefaultRegistry()); + this(createDefaultRegistry(PipelineTypes.DEFAULT.name()), + createDefaultRegistry(PipelineTypes.TASK.name())); } public GenericHelixController(String clusterName) { - this(createDefaultRegistry(), createDefaultRegistry(), clusterName); + this(createDefaultRegistry(PipelineTypes.DEFAULT.name()), + createDefaultRegistry(PipelineTypes.TASK.name()), clusterName); } class RebalanceTask extends TimerTask { @@ -218,17 +225,17 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC _timerPeriod = Integer.MAX_VALUE; } - private static PipelineRegistry createDefaultRegistry() { + private static PipelineRegistry createDefaultRegistry(String pipelineName) { logger.info("createDefaultRegistry"); synchronized (GenericHelixController.class) { PipelineRegistry registry = new PipelineRegistry(); // cluster data cache refresh - Pipeline dataRefresh = new Pipeline(); + Pipeline dataRefresh = new Pipeline(pipelineName); dataRefresh.addStage(new ReadClusterDataStage()); // rebalance pipeline - Pipeline rebalancePipeline = new Pipeline(); + Pipeline rebalancePipeline = new Pipeline(pipelineName); rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new ResourceValidationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); @@ -241,11 +248,11 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC rebalancePipeline.addStage(new PersistAssignmentStage()); // external view generation - Pipeline externalViewPipeline = new Pipeline(); + Pipeline externalViewPipeline = new Pipeline(pipelineName); externalViewPipeline.addStage(new ExternalViewComputeStage()); // backward compatibility check - Pipeline liveInstancePipeline = new Pipeline(); + Pipeline liveInstancePipeline = new Pipeline(pipelineName); liveInstancePipeline.addStage(new CompatibilityCheckStage()); registry.register(ClusterEventType.IdealStateChange, dataRefresh, rebalancePipeline); @@ -349,26 +356,31 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC ? _registry.getPipelinesForEvent(event.getEventType()) : _taskRegistry.getPipelinesForEvent(event.getEventType()); if (pipelines == null || pipelines.size() == 0) { - logger.info("No pipeline to run for event:" + event.getEventType()); + logger.info( + "No " + getPipelineType(cache.isTaskCache()) + " pipeline to run for event:" + event + .getEventType()); return; } - logger.info("START: Invoking controller pipeline for event: " + event.getEventType()); + logger.info(String.format("START: Invoking %s controller pipeline for event: %s", + getPipelineType(cache.isTaskCache()), event.getEventType())); long startTime = System.currentTimeMillis(); for (Pipeline pipeline : pipelines) { try { pipeline.handle(event); pipeline.finish(); } catch (Exception e) { - logger.error("Exception while executing pipeline: " + pipeline - + ". Will not continue to next pipeline", e); + logger.error( + "Exception while executing " + getPipelineType(cache.isTaskCache()) + "pipeline: " + + pipeline + ". Will not continue to next pipeline", e); break; } } long endTime = System.currentTimeMillis(); logger.info( - "END: Invoking controller pipeline for event: " + event.getEventType() + " for cluster " - + manager.getClusterName() + ", took " + (endTime - startTime) + " ms"); + "END: Invoking " + getPipelineType(cache.isTaskCache()) + " controller pipeline for event: " + + event.getEventType() + " for cluster " + manager.getClusterName() + ", took " + ( + endTime - startTime) + " ms"); if (!cache.isTaskCache()) { // report event process durations @@ -773,4 +785,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC eventThread.setDaemon(true); eventThread.start(); } + + public static String getPipelineType(boolean isTask) { + return isTask ? PipelineTypes.TASK.name() : PipelineTypes.DEFAULT.name(); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java index 649a086..ebe42b9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java @@ -29,10 +29,16 @@ import org.apache.log4j.Logger; public class Pipeline { private static final Logger logger = Logger.getLogger(Pipeline.class.getName()); + private final String _pipelineType; List<Stage> _stages; public Pipeline() { - _stages = new ArrayList<Stage>(); + this(""); + } + + public Pipeline(String pipelineType) { + _stages = new ArrayList<>(); + _pipelineType = pipelineType; } public void addStage(Stage stage) { @@ -47,7 +53,8 @@ public class Pipeline { } for (Stage stage : _stages) { long startTime = System.currentTimeMillis(); - logger.info("START " + stage.getStageName()); + logger.info( + String.format("START %s pipeline for stage %s", _pipelineType, stage.getStageName())); stage.preProcess(); stage.process(event); @@ -55,10 +62,12 @@ public class Pipeline { long endTime = System.currentTimeMillis(); long duration = endTime - startTime; - logger.info( - String.format("END %s for cluster %s. took: %d ms ", stage.getStageName(), event.getClusterName(), duration)); + logger.info(String + .format("END %s for %s pipeline cluster %s. took: %d ms ", stage.getStageName(), + _pipelineType, event.getClusterName(), duration)); - ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); + ClusterStatusMonitor clusterStatusMonitor = + event.getAttribute(AttributeName.clusterStatusMonitor.name()); if (clusterStatusMonitor != null) { clusterStatusMonitor.updateClusterEventDuration(stage.getStageName(), duration); } http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 555559f..3c30a4d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -26,6 +26,7 @@ import java.util.concurrent.Callable; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.controller.rebalancer.AutoRebalancer; @@ -104,8 +105,9 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { } long endTime = System.currentTimeMillis(); - logger.info("END BestPossibleStateCalcStage.process() for cluster " + cache.getClusterName() - + ". took: " + (endTime - startTime) + " ms"); + logger.info("END " + GenericHelixController.getPipelineType(cache.isTaskCache()) + + " BestPossibleStateCalcStage.process() for cluster " + cache.getClusterName() + ". took: " + + (endTime - startTime) + " ms"); } private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap, http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 3178cc6..0d62cad 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -854,7 +854,7 @@ public class ClusterDataCache { _contextMap.put(context.getSimpleField(NAME), context); } else { _contextMap.put(childNames.get(i), context); - LOG.warn( + LOG.info( String.format("Context for %s is null or miss the context NAME!", childNames.get((i)))); } } http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 2385bd4..88ddf20 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.CurrentState; @@ -148,7 +149,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage { event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); long endTime = System.currentTimeMillis(); - LOG.info("END CurrentStateComputationStage.process() for cluster " + cache.getClusterName() + LOG.info("END " + GenericHelixController.getPipelineType(cache.isTaskCache()) + + " CurrentStateComputationStage.process() for cluster " + cache.getClusterName() + ". took: " + (endTime - startTime) + " ms"); } http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index 7950e8f..8fbe904 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -35,6 +35,7 @@ import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; import org.apache.helix.ZNRecordDelta; import org.apache.helix.ZNRecordDelta.MergeOperation; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; @@ -184,8 +185,9 @@ public class ExternalViewComputeStage extends AbstractBaseStage { } long endTime = System.currentTimeMillis(); - LOG.info("END ExternalViewComputeStage.process() for cluster " + cache.getClusterName() - + ". took: " + (endTime - startTime) + " ms"); + LOG.info("END " + GenericHelixController.getPipelineType(cache.isTaskCache()) + + " ExternalViewComputeStage.process() for cluster " + cache.getClusterName() + ". took: " + + (endTime - startTime) + " ms"); } private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager, http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index 8b5b93a..1b4d3f3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -34,6 +34,7 @@ import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; @@ -83,9 +84,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } long endTime = System.currentTimeMillis(); - logger.info( - "END ImmediateStateCalcStage.process() for cluster " + cache.getClusterName() + ". took: " - + (endTime - startTime) + " ms"); + logger.info("END " + GenericHelixController.getPipelineType(cache.isTaskCache()) + + " ImmediateStateCalcStage.process() for cluster " + cache.getClusterName() + ". took: " + + (endTime - startTime) + " ms"); } private IntermediateStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap, http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java index 0ec5840..b9ae60e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java @@ -26,6 +26,7 @@ import java.util.concurrent.Callable; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.InstanceConfig; @@ -105,6 +106,7 @@ public class ReadClusterDataStage extends AbstractBaseStage { event.addAttribute(AttributeName.ClusterDataCache.name(), _cache); long endTime = System.currentTimeMillis(); - logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms"); + logger.info("END " + GenericHelixController.getPipelineType(_cache.isTaskCache()) + + " ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms"); } } http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index 6064e85..0f67bbd 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -23,6 +23,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.ClusterConfig; @@ -160,8 +161,9 @@ public class ResourceComputationStage extends AbstractBaseStage { event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceToRebalance); long endTime = System.currentTimeMillis(); - LOG.info("END ResourceComputationStage.process() for cluster " + cache.getClusterName() - + ". took: " + (endTime - startTime) + " ms"); + LOG.info("END " + GenericHelixController.getPipelineType(cache.isTaskCache()) + + " ResourceComputationStage.process() for cluster " + cache.getClusterName() + ". took: " + + (endTime - startTime) + " ms"); } private void addResource(String resource, Map<String, Resource> resourceMap) { http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index c774d4f..39a6e76 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -30,6 +30,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperties; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.LiveInstance; @@ -87,9 +88,9 @@ public class TaskAssignmentStage extends AbstractBaseStage { logger.debug("Caching messages took " + (cacheEnd - cacheStart) + " ms"); long endTime = System.currentTimeMillis(); - logger.info( - "END TaskAssignmentStage.process() for cluster " + cache.getClusterName() + ". took: " + ( - endTime - startTime) + " ms"); + logger.info("END " + GenericHelixController.getPipelineType(cache.isTaskCache()) + + " TaskAssignmentStage.process() for cluster " + cache.getClusterName() + ". took: " + ( + endTime - startTime) + " ms"); } List<Message> batchMessage(Builder keyBuilder, List<Message> messages,