Add Pipeline types to differentiate logs

Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c7b250a8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c7b250a8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c7b250a8

Branch: refs/heads/master
Commit: c7b250a891ada90d98e1828425b2caf1cb60b956
Parents: 7571d22
Author: Junkai Xue <j...@linkedin.com>
Authored: Mon Aug 28 14:50:55 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:07:07 2017 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 42 ++++++++++++++------
 .../helix/controller/pipeline/Pipeline.java     | 19 ++++++---
 .../stages/BestPossibleStateCalcStage.java      |  6 ++-
 .../controller/stages/ClusterDataCache.java     |  2 +-
 .../stages/CurrentStateComputationStage.java    |  4 +-
 .../stages/ExternalViewComputeStage.java        |  6 ++-
 .../stages/IntermediateStateCalcStage.java      |  7 ++--
 .../controller/stages/ReadClusterDataStage.java |  4 +-
 .../stages/ResourceComputationStage.java        |  6 ++-
 .../controller/stages/TaskAssignmentStage.java  |  7 ++--
 10 files changed, 70 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index c0886ba..e89d159 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -137,17 +137,24 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
 
   private String _clusterName;
 
+  enum PipelineTypes {
+    DEFAULT,
+    TASK
+  }
+
   /**
    * Default constructor that creates a default pipeline registry. This is 
sufficient in most cases,
    * but if there is a some thing specific needed use another constructor 
where in you can pass a
    * pipeline registry
    */
   public GenericHelixController() {
-    this(createDefaultRegistry(), createDefaultRegistry());
+    this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
+        createDefaultRegistry(PipelineTypes.TASK.name()));
   }
 
   public GenericHelixController(String clusterName) {
-    this(createDefaultRegistry(), createDefaultRegistry(), clusterName);
+    this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
+        createDefaultRegistry(PipelineTypes.TASK.name()), clusterName);
   }
 
   class RebalanceTask extends TimerTask {
@@ -218,17 +225,17 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     _timerPeriod = Integer.MAX_VALUE;
   }
 
-  private static PipelineRegistry createDefaultRegistry() {
+  private static PipelineRegistry createDefaultRegistry(String pipelineName) {
     logger.info("createDefaultRegistry");
     synchronized (GenericHelixController.class) {
       PipelineRegistry registry = new PipelineRegistry();
 
       // cluster data cache refresh
-      Pipeline dataRefresh = new Pipeline();
+      Pipeline dataRefresh = new Pipeline(pipelineName);
       dataRefresh.addStage(new ReadClusterDataStage());
 
       // rebalance pipeline
-      Pipeline rebalancePipeline = new Pipeline();
+      Pipeline rebalancePipeline = new Pipeline(pipelineName);
       rebalancePipeline.addStage(new ResourceComputationStage());
       rebalancePipeline.addStage(new ResourceValidationStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
@@ -241,11 +248,11 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
       rebalancePipeline.addStage(new PersistAssignmentStage());
 
       // external view generation
-      Pipeline externalViewPipeline = new Pipeline();
+      Pipeline externalViewPipeline = new Pipeline(pipelineName);
       externalViewPipeline.addStage(new ExternalViewComputeStage());
 
       // backward compatibility check
-      Pipeline liveInstancePipeline = new Pipeline();
+      Pipeline liveInstancePipeline = new Pipeline(pipelineName);
       liveInstancePipeline.addStage(new CompatibilityCheckStage());
 
       registry.register(ClusterEventType.IdealStateChange, dataRefresh, 
rebalancePipeline);
@@ -349,26 +356,31 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
         ? _registry.getPipelinesForEvent(event.getEventType())
         : _taskRegistry.getPipelinesForEvent(event.getEventType());
     if (pipelines == null || pipelines.size() == 0) {
-      logger.info("No pipeline to run for event:" + event.getEventType());
+      logger.info(
+          "No " + getPipelineType(cache.isTaskCache()) + " pipeline to run for 
event:" + event
+              .getEventType());
       return;
     }
 
-    logger.info("START: Invoking controller pipeline for event: " + 
event.getEventType());
+    logger.info(String.format("START: Invoking %s controller pipeline for 
event: %s",
+        getPipelineType(cache.isTaskCache()), event.getEventType()));
     long startTime = System.currentTimeMillis();
     for (Pipeline pipeline : pipelines) {
       try {
         pipeline.handle(event);
         pipeline.finish();
       } catch (Exception e) {
-        logger.error("Exception while executing pipeline: " + pipeline
-            + ". Will not continue to next pipeline", e);
+        logger.error(
+            "Exception while executing " + 
getPipelineType(cache.isTaskCache()) + "pipeline: "
+                + pipeline + ". Will not continue to next pipeline", e);
         break;
       }
     }
     long endTime = System.currentTimeMillis();
     logger.info(
-        "END: Invoking controller pipeline for event: " + event.getEventType() 
+ " for cluster "
-            + manager.getClusterName() + ", took " + (endTime - startTime) + " 
ms");
+        "END: Invoking " + getPipelineType(cache.isTaskCache()) + " controller 
pipeline for event: "
+            + event.getEventType() + " for cluster " + 
manager.getClusterName() + ", took " + (
+            endTime - startTime) + " ms");
 
     if (!cache.isTaskCache()) {
       // report event process durations
@@ -773,4 +785,8 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     eventThread.setDaemon(true);
     eventThread.start();
   }
+
+  public static String getPipelineType(boolean isTask) {
+    return isTask ? PipelineTypes.TASK.name() : PipelineTypes.DEFAULT.name();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
index 649a086..ebe42b9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -29,10 +29,16 @@ import org.apache.log4j.Logger;
 
 public class Pipeline {
   private static final Logger logger = 
Logger.getLogger(Pipeline.class.getName());
+  private final String _pipelineType;
   List<Stage> _stages;
 
   public Pipeline() {
-    _stages = new ArrayList<Stage>();
+    this("");
+  }
+
+  public Pipeline(String pipelineType) {
+    _stages = new ArrayList<>();
+    _pipelineType = pipelineType;
   }
 
   public void addStage(Stage stage) {
@@ -47,7 +53,8 @@ public class Pipeline {
     }
     for (Stage stage : _stages) {
       long startTime = System.currentTimeMillis();
-      logger.info("START " + stage.getStageName());
+      logger.info(
+          String.format("START %s pipeline for stage %s", _pipelineType, 
stage.getStageName()));
 
       stage.preProcess();
       stage.process(event);
@@ -55,10 +62,12 @@ public class Pipeline {
 
       long endTime = System.currentTimeMillis();
       long duration = endTime - startTime;
-      logger.info(
-          String.format("END %s for cluster %s. took: %d ms ", 
stage.getStageName(), event.getClusterName(), duration));
+      logger.info(String
+          .format("END %s for %s pipeline cluster %s. took: %d ms ", 
stage.getStageName(),
+              _pipelineType, event.getClusterName(), duration));
 
-      ClusterStatusMonitor clusterStatusMonitor = 
event.getAttribute(AttributeName.clusterStatusMonitor.name());
+      ClusterStatusMonitor clusterStatusMonitor =
+          event.getAttribute(AttributeName.clusterStatusMonitor.name());
       if (clusterStatusMonitor != null) {
         clusterStatusMonitor.updateClusterEventDuration(stage.getStageName(), 
duration);
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 555559f..3c30a4d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.AutoRebalancer;
@@ -104,8 +105,9 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     }
 
     long endTime = System.currentTimeMillis();
-    logger.info("END BestPossibleStateCalcStage.process() for cluster " + 
cache.getClusterName()
-        + ". took: " + (endTime - startTime) + " ms");
+    logger.info("END " + 
GenericHelixController.getPipelineType(cache.isTaskCache())
+        + " BestPossibleStateCalcStage.process() for cluster " + 
cache.getClusterName() + ". took: "
+        + (endTime - startTime) + " ms");
   }
 
   private BestPossibleStateOutput compute(ClusterEvent event, Map<String, 
Resource> resourceMap,

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 3178cc6..0d62cad 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -854,7 +854,7 @@ public class ClusterDataCache {
         _contextMap.put(context.getSimpleField(NAME), context);
       } else {
         _contextMap.put(childNames.get(i), context);
-        LOG.warn(
+        LOG.info(
             String.format("Context for %s is null or miss the context NAME!", 
childNames.get((i))));
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 2385bd4..88ddf20 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CurrentState;
@@ -148,7 +149,8 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     long endTime = System.currentTimeMillis();
-    LOG.info("END CurrentStateComputationStage.process() for cluster " + 
cache.getClusterName()
+    LOG.info("END " + 
GenericHelixController.getPipelineType(cache.isTaskCache())
+        + " CurrentStateComputationStage.process() for cluster " + 
cache.getClusterName()
         + ". took: " + (endTime - startTime) + " ms");
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 7950e8f..8fbe904 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -35,6 +35,7 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
@@ -184,8 +185,9 @@ public class ExternalViewComputeStage extends 
AbstractBaseStage {
     }
 
     long endTime = System.currentTimeMillis();
-    LOG.info("END ExternalViewComputeStage.process() for cluster " + 
cache.getClusterName()
-        + ". took: " + (endTime - startTime) + " ms");
+    LOG.info("END " + 
GenericHelixController.getPipelineType(cache.isTaskCache())
+        + " ExternalViewComputeStage.process() for cluster " + 
cache.getClusterName() + ". took: "
+        + (endTime - startTime) + " ms");
   }
 
   private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 8b5b93a..1b4d3f3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
@@ -83,9 +84,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     }
 
     long endTime = System.currentTimeMillis();
-    logger.info(
-        "END ImmediateStateCalcStage.process() for cluster " + 
cache.getClusterName() + ". took: "
-            + (endTime - startTime) + " ms");
+    logger.info("END " + 
GenericHelixController.getPipelineType(cache.isTaskCache())
+        + " ImmediateStateCalcStage.process() for cluster " + 
cache.getClusterName() + ". took: "
+        + (endTime - startTime) + " ms");
   }
 
   private IntermediateStateOutput compute(ClusterEvent event, Map<String, 
Resource> resourceMap,

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 0ec5840..b9ae60e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.InstanceConfig;
@@ -105,6 +106,7 @@ public class ReadClusterDataStage extends AbstractBaseStage 
{
     event.addAttribute(AttributeName.ClusterDataCache.name(), _cache);
 
     long endTime = System.currentTimeMillis();
-    logger.info("END ReadClusterDataStage.process(). took: " + (endTime - 
startTime) + " ms");
+    logger.info("END " + 
GenericHelixController.getPipelineType(_cache.isTaskCache())
+        + " ReadClusterDataStage.process(). took: " + (endTime - startTime) + 
" ms");
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 6064e85..0f67bbd 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.ClusterConfig;
@@ -160,8 +161,9 @@ public class ResourceComputationStage extends 
AbstractBaseStage {
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceToRebalance);
 
     long endTime = System.currentTimeMillis();
-    LOG.info("END ResourceComputationStage.process() for cluster " + 
cache.getClusterName()
-        + ". took: " + (endTime - startTime) + " ms");
+    LOG.info("END " + 
GenericHelixController.getPipelineType(cache.isTaskCache())
+        + " ResourceComputationStage.process() for cluster " + 
cache.getClusterName() + ". took: "
+        + (endTime - startTime) + " ms");
   }
 
   private void addResource(String resource, Map<String, Resource> resourceMap) 
{

http://git-wip-us.apache.org/repos/asf/helix/blob/c7b250a8/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c774d4f..39a6e76 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.LiveInstance;
@@ -87,9 +88,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     logger.debug("Caching messages took " + (cacheEnd - cacheStart) + " ms");
 
     long endTime = System.currentTimeMillis();
-    logger.info(
-        "END TaskAssignmentStage.process() for cluster " + 
cache.getClusterName() + ". took: " + (
-            endTime - startTime) + " ms");
+    logger.info("END " + 
GenericHelixController.getPipelineType(cache.isTaskCache())
+        + " TaskAssignmentStage.process() for cluster " + 
cache.getClusterName() + ". took: " + (
+        endTime - startTime) + " ms");
   }
 
   List<Message> batchMessage(Builder keyBuilder, List<Message> messages,

Reply via email to