Wrap these high frequent called debug log statements with debug level check to 
reduce memory footprint.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/13fac7c7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/13fac7c7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/13fac7c7

Branch: refs/heads/master
Commit: 13fac7c77bb570631ff93e8ff83c5cdd2bcd903f
Parents: f8fcd0b
Author: Lei Xia <l...@linkedin.com>
Authored: Mon Apr 2 14:33:29 2018 -0700
Committer: Lei Xia <l...@linkedin.com>
Committed: Mon Apr 16 11:22:40 2018 -0700

----------------------------------------------------------------------
 .../helix/common/ClusterEventBlockingQueue.java |  6 +-
 .../rebalancer/DelayedAutoRebalancer.java       | 22 ++++--
 .../strategy/AutoRebalanceStrategy.java         |  4 +-
 .../stages/BestPossibleStateCalcStage.java      |  8 +-
 .../stages/IntermediateStateCalcStage.java      | 41 ++++++----
 .../helix/manager/zk/CallbackHandler.java       | 65 +++++++++-------
 .../helix/manager/zk/ZkCacheEventThread.java    | 12 ++-
 .../helix/manager/zk/zookeeper/ZkClient.java    | 22 ++++--
 .../manager/zk/zookeeper/ZkEventThread.java     | 12 ++-
 .../org/apache/helix/task/JobRebalancer.java    | 81 +++++++++++++-------
 .../org/apache/helix/task/TaskRebalancer.java   | 24 ++++--
 .../apache/helix/task/WorkflowRebalancer.java   | 20 +++--
 12 files changed, 204 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
 
b/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
index 437fea0..7f831a2 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
@@ -59,8 +59,10 @@ public class ClusterEventBlockingQueue {
    */
   public void put(ClusterEvent event) {
     _eventQueue.put(event.getEventType(), event);
-    LOG.debug("Putting event " + event.getEventType());
-    LOG.debug("Event queue size: " + _eventQueue.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Putting event " + event.getEventType());
+      LOG.debug("Event queue size: " + _eventQueue.size());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index dc1517a..a7e5f50 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -62,7 +62,9 @@ public class DelayedAutoRebalancer extends AbstractRebalancer 
{
 
     IdealState cachedIdealState = getCachedIdealState(resourceName, 
clusterData);
     if (cachedIdealState != null) {
-      LOG.debug("Use cached IdealState for " + resourceName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Use cached IdealState for " + resourceName);
+      }
       return cachedIdealState;
     }
 
@@ -176,7 +178,9 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
       finalMapping =
           getFinalDelayedMapping(currentIdealState, newIdealMapping, 
newActiveMapping, liveEnabledNodes,
               replicaCount, minActiveReplicas);
-      LOG.debug("newActiveMapping: " + newActiveMapping);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("newActiveMapping: " + newActiveMapping);
+      }
     }
 
     finalMapping.getListFields().putAll(userDefinedPreferenceList);
@@ -258,15 +262,19 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
 
     if (nextRebalanceTime == Long.MAX_VALUE) {
       long startTime = 
_rebalanceScheduler.removeScheduledRebalance(resourceName);
-      LOG.debug(String
-          .format("Remove exist rebalance timer for resource %s at %d\n", 
resourceName, startTime));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String
+            .format("Remove exist rebalance timer for resource %s at %d\n", 
resourceName, startTime));
+      }
     } else {
       long currentScheduledTime = 
_rebalanceScheduler.getRebalanceTime(resourceName);
       if (currentScheduledTime < 0 || currentScheduledTime > 
nextRebalanceTime) {
         _rebalanceScheduler.scheduleRebalance(_manager, resourceName, 
nextRebalanceTime);
-        LOG.debug(String
-            .format("Set next rebalance time for resource %s at time %d\n", 
resourceName,
-                nextRebalanceTime));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String
+              .format("Set next rebalance time for resource %s at time %d\n", 
resourceName,
+                  nextRebalanceTime));
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
index ed510d3..0c5c0ed 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
@@ -289,7 +289,9 @@ public class AutoRebalanceStrategy implements 
RebalanceStrategy {
           }
         }
         if (donor.capacity < donor.currentlyAssigned) {
-          logger.debug("Could not take partitions out of node:" + donor.id);
+          if (logger.isDebugEnabled()) {
+            logger.debug("Could not take partitions out of node:" + donor.id);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 1dad4f0..a465f05 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -78,8 +78,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
       final Map<String, InstanceConfig> instanceConfigMap = 
cache.getInstanceConfigMap();
       final Map<String, StateModelDefinition> stateModelDefMap = 
cache.getStateModelDefMap();
       asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() {
-        @Override
-        public Object call() {
+        @Override public Object call() {
           try {
             if (clusterStatusMonitor != null) {
               clusterStatusMonitor
@@ -289,7 +288,10 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     Rebalancer customizedRebalancer = null;
     String rebalancerClassName = idealState.getRebalancerClassName();
     if (rebalancerClassName != null) {
-      logger.debug("resource " + resourceName + " use idealStateRebalancer " + 
rebalancerClassName);
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("resource " + resourceName + " use idealStateRebalancer " + 
rebalancerClassName);
+      }
       try {
         customizedRebalancer = Rebalancer.class
             .cast(HelixUtil.loadClass(getClass(), 
rebalancerClassName).newInstance());

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index e4324e4..166cc63 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -444,8 +444,10 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     boolean throttled = false;
     if (throttleController.throttleforResource(rebalanceType, resourceName)) {
       throttled = true;
-      logger
-          .debug("Throttled on resource for " + resourceName + " " + 
partition.getPartitionName());
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("Throttled on resource for " + resourceName + " " + 
partition.getPartitionName());
+      }
     } else {
       // throttle if any of the instance can not handle the state transition
       for (String ins : allInstances) {
@@ -454,9 +456,10 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
         if (bestPossibleState != null && 
!bestPossibleState.equals(currentState)) {
           if (throttleController.throttleForInstance(rebalanceType, ins)) {
             throttled = true;
-            logger.debug(
-                "Throttled because instance " + ins + " for " + resourceName + 
" " + partition
-                    .getPartitionName());
+            if (logger.isDebugEnabled()) {
+              logger.debug("Throttled because instance " + ins + " for " + 
resourceName + " " + partition
+                      .getPartitionName());
+            }
           }
         }
       }
@@ -536,20 +539,24 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       PartitionStateMap bestPossibleStateMap,
       PartitionStateMap intermediateStateMap) {
 
-    logger.debug("Partitions need recovery: " + recoveryPartitions
-        + "\nPartitions get throttled on recovery: " + 
recoveryThrottledPartitions);
-    logger.debug("Partitions need loadbalance: " + loadbalancePartitions
-        + "\nPartitions get throttled on load-balance: " + 
loadbalanceThrottledPartitions);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Partitions need recovery: " + recoveryPartitions
+          + "\nPartitions get throttled on recovery: " + 
recoveryThrottledPartitions);
+      logger.debug("Partitions need loadbalance: " + loadbalancePartitions
+          + "\nPartitions get throttled on load-balance: " + 
loadbalanceThrottledPartitions);
+    }
 
     for (Partition partition : allPartitions) {
-      logger.debug(
-          partition + ": Best possible map: " + 
bestPossibleStateMap.getPartitionMap(partition));
-      logger.debug(partition + ": Current State: " + currentStateOutput
-          .getCurrentStateMap(resource, partition));
-      logger.debug(partition + ": Pending state: " + currentStateOutput
-          .getPendingMessageMap(resource, partition));
-      logger.debug(
-          partition + ": Intermediate state: " + 
intermediateStateMap.getPartitionMap(partition));
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            partition + ": Best possible map: " + 
bestPossibleStateMap.getPartitionMap(partition));
+        logger.debug(partition + ": Current State: " + currentStateOutput
+            .getCurrentStateMap(resource, partition));
+        logger.debug(partition + ": Pending state: " + currentStateOutput
+            .getPendingMessageMap(resource, partition));
+        logger.debug(
+            partition + ": Intermediate state: " + 
intermediateStateMap.getPartitionMap(partition));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index c3b8a4c..22a1a46 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -458,13 +458,17 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
   private void subscribeChildChange(String path, NotificationContext.Type 
callbackType) {
     if (callbackType == NotificationContext.Type.INIT
         || callbackType == NotificationContext.Type.CALLBACK) {
-      logger.info(
-          _manager.getInstanceName() + " subscribes child-change. path: " + 
path + ", listener: " + _listener);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            _manager.getInstanceName() + " subscribes child-change. path: " + 
path + ", listener: " + _listener);
+      }
       _zkClient.subscribeChildChanges(path, this);
     } else if (callbackType == NotificationContext.Type.FINALIZE) {
-      logger.info(
-          _manager.getInstanceName() + " unsubscribe child-change. path: " + 
path + ", listener: " + _listener);
-
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            _manager.getInstanceName() + " unsubscribe child-change. path: " + 
path + ", listener: "
+                + _listener);
+      }
       _zkClient.unsubscribeChildChanges(path, this);
     }
   }
@@ -472,15 +476,18 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
   private void subscribeDataChange(String path, NotificationContext.Type 
callbackType) {
     if (callbackType == NotificationContext.Type.INIT
         || callbackType == NotificationContext.Type.CALLBACK) {
-      logger.info(
-          _manager.getInstanceName() + " subscribe data-change. path: " + path 
+ ", listener: "
-              + _listener);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            _manager.getInstanceName() + " subscribe data-change. path: " + 
path + ", listener: "
+                + _listener);
+      }
       _zkClient.subscribeDataChanges(path, this);
     } else if (callbackType == NotificationContext.Type.FINALIZE) {
-      logger.info(
-          _manager.getInstanceName() + " unsubscribe data-change. path: " + 
path + ", listener: "
-              + _listener);
-
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            _manager.getInstanceName() + " unsubscribe data-change. path: " + 
path + ", listener: "
+                + _listener);
+      }
       _zkClient.unsubscribeDataChanges(path, this);
     }
   }
@@ -497,22 +504,18 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
     long start = System.currentTimeMillis();
     if (_eventTypes.contains(EventType.NodeDataChanged) || _eventTypes
         .contains(EventType.NodeCreated) || 
_eventTypes.contains(EventType.NodeDeleted)) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Subscribing data change listener to path:" + path);
-      }
+      logger.info("Subscribing data change listener to path:" + path + " for 
listener: " + _listener);
       subscribeDataChange(path, callbackType);
     }
 
     if (_eventTypes.contains(EventType.NodeChildrenChanged)) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Subscribing child change listener to path:" + path);
-      }
+      logger.info(
+          "Subscribing child change listener to path:" + path + " for 
listener: " + _listener);
       subscribeChildChange(path, callbackType);
       if (watchChild) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Subscribing data change listener to all children for 
path:" + path);
-        }
-
+        logger.info(
+            "Subscribing data change listener to all children for path:" + 
path + " for listener: "
+                + _listener);
         try {
           switch (_changeType) {
           case CURRENT_STATE:
@@ -625,14 +628,18 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
     try {
       updateNotificationTime(System.nanoTime());
       if (dataPath != null && dataPath.startsWith(_path)) {
-        logger.info(_manager.getInstanceName() + " unsubscribe data-change. 
path: " + dataPath
-            + ", listener: " + _listener);
+        if (logger.isDebugEnabled()) {
+          logger.info(_manager.getInstanceName() + " unsubscribe data-change. 
path: " + dataPath
+              + ", listener: " + _listener);
+        }
         _zkClient.unsubscribeDataChanges(dataPath, this);
 
         // only needed for bucketized parent, but OK if we don't have 
child-change
         // watch on the bucketized parent path
-        logger.info(_manager.getInstanceName() + " unsubscribe child-change. 
path: " + dataPath
-            + ", listener: " + _listener);
+        if (logger.isDebugEnabled()) {
+          logger.info(_manager.getInstanceName() + " unsubscribe child-change. 
path: " + dataPath
+              + ", listener: " + _listener);
+        }
         _zkClient.unsubscribeChildChanges(dataPath, this);
         // No need to invoke() since this event will handled by child-change 
on parent-node
       }
@@ -647,8 +654,10 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
   public void handleChildChange(String parentPath, List<String> currentChilds) 
{
     if (logger.isDebugEnabled()) {
       logger.debug(
-          "Data change callback: child changed, path: " + parentPath + ", 
current child count: "
-              + currentChilds.size());
+          "Data change callback: child changed, path: " + parentPath + ", 
current child count: " + (
+              currentChilds != null
+                  ? currentChilds.size()
+                  : 0));
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
index d04f26e..9c208d3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
@@ -62,7 +62,9 @@ public class ZkCacheEventThread extends Thread {
       while (!isInterrupted()) {
         ZkCacheEvent zkEvent = _events.take();
         int eventId = _eventId.incrementAndGet();
-        LOG.debug("Delivering event #" + eventId + " " + zkEvent);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Delivering event #" + eventId + " " + zkEvent);
+        }
         try {
           zkEvent.run();
         } catch (InterruptedException e) {
@@ -74,7 +76,9 @@ public class ZkCacheEventThread extends Thread {
         } catch (Throwable e) {
           LOG.error("Error handling event " + zkEvent, e);
         }
-        LOG.debug("Delivering event #" + eventId + " done");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Delivering event #" + eventId + " done");
+        }
       }
     } catch (InterruptedException e) {
       LOG.info("Terminate ZkClient event thread.");
@@ -83,7 +87,9 @@ public class ZkCacheEventThread extends Thread {
 
   public void send(ZkCacheEvent event) {
     if (!isInterrupted()) {
-      LOG.debug("New event: " + event);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("New event: " + event);
+      }
       _events.add(event);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index edf1cd1..0aa6587 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -189,9 +189,11 @@ public class ZkClient implements Watcher {
       IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, 
prefetchEnabled);
       listenerEntries.add(listenerEntry);
       if (prefetchEnabled) {
-        LOG.debug(
-            "Subscribed data changes for " + path + ", listener: " + listener 
+ ", prefetch data: "
-                + prefetchEnabled);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Subscribed data changes for " + path + ", listener: " + 
listener + ", prefetch data: "
+                  + prefetchEnabled);
+        }
       }
     }
     watchForData(path);
@@ -618,8 +620,10 @@ public class ZkClient implements Watcher {
 
 
     if (event.getType() == Event.EventType.NodeDeleted) {
-      String path = event.getPath();
-      LOG.debug(path);
+      if (LOG.isDebugEnabled()) {
+        String path = event.getPath();
+        LOG.debug(path);
+      }
     }
 
     getEventLock().lock();
@@ -665,7 +669,9 @@ public class ZkClient implements Watcher {
       // update state change counter.
       recordStateChange(stateChanged, dataChanged);
 
-      LOG.debug("Leaving process event");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Leaving process event");
+      }
     }
   }
 
@@ -914,7 +920,9 @@ public class ZkClient implements Watcher {
             try {
               Object data = null;
               if (listener.isPrefetchData()) {
-                LOG.debug("Prefetch data for path: " + path);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Prefetch data for path: " + path);
+                }
                 data = readData(path, null, true);
               }
               listener.getDataListener().handleDataChange(path, data);

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
index 21d5400..a16f27c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
@@ -62,7 +62,9 @@ public class ZkEventThread extends Thread {
       while (!isInterrupted()) {
         ZkEvent zkEvent = _events.take();
         int eventId = _eventId.incrementAndGet();
-        LOG.debug("Delivering event #" + eventId + " " + zkEvent);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Delivering event #" + eventId + " " + zkEvent);
+        }
         try {
           zkEvent.run();
           _totalEventCountHandled ++;
@@ -73,7 +75,9 @@ public class ZkEventThread extends Thread {
         } catch (Throwable e) {
           LOG.error("Error handling event " + zkEvent, e);
         }
-        LOG.debug("Delivering event #" + eventId + " done");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Delivering event #" + eventId + " done");
+        }
       }
     } catch (InterruptedException e) {
       LOG.info("Terminate ZkClient event thread.");
@@ -82,7 +86,9 @@ public class ZkEventThread extends Thread {
 
   public void send(ZkEvent event) {
     if (!isInterrupted()) {
-      LOG.debug("New event: " + event);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("New event: " + event);
+      }
       _events.add(event);
       _totalEventCount ++;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index d00ffaf..1620238 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -271,8 +271,11 @@ public class JobRebalancer extends TaskRebalancer {
         getPrevInstanceToTaskAssignments(liveInstances, 
prevTaskToInstanceStateAssignment, allPartitions);
     long currentTime = System.currentTimeMillis();
 
-    LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + 
prevInstanceToTaskAssignments
-        + " excludedInstances: " + excludedInstances);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "All partitions: " + allPartitions + " taskAssignment: " + 
prevInstanceToTaskAssignments
+              + " excludedInstances: " + excludedInstances);
+    }
 
     // Iterate through all instances
     for (String instance : prevInstanceToTaskAssignments.keySet()) {
@@ -311,9 +314,11 @@ public class JobRebalancer extends TaskRebalancer {
 
           paMap.put(pId, new PartitionAssignment(instance, 
requestedState.name()));
           assignedPartitions.add(pId);
-          LOG.debug(String.format(
-              "Instance %s requested a state transition to %s for partition 
%s.", instance,
-              requestedState, pName));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Instance %s requested a state transition to %s for partition 
%s.", instance,
+                requestedState, pName));
+          }
           continue;
         }
 
@@ -328,8 +333,11 @@ public class JobRebalancer extends TaskRebalancer {
 
           paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
           assignedPartitions.add(pId);
-          LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
-              nextState, instance));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String
+                .format("Setting task partition %s state to %s on instance 
%s.", pName, nextState,
+                    instance));
+          }
         }
           break;
         case STOPPED: {
@@ -342,17 +350,21 @@ public class JobRebalancer extends TaskRebalancer {
 
           paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
           assignedPartitions.add(pId);
-          LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
-              nextState, instance));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String
+                .format("Setting task partition %s state to %s on instance 
%s.", pName, nextState,
+                    instance));
+          }
         }
           break;
         case COMPLETED: {
           // The task has completed on this partition. Mark as such in the 
context object.
           donePartitions.add(pId);
-          LOG.debug(String
-              .format(
-                  "Task partition %s has completed with state %s. Marking as 
such in rebalancer context.",
-                  pName, currState));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has completed with state %s. Marking as 
such in rebalancer context.",
+                pName, currState));
+          }
           partitionsToDropFromIs.add(pId);
           markPartitionCompleted(jobCtx, pId);
         }
@@ -362,9 +374,11 @@ public class JobRebalancer extends TaskRebalancer {
         case TASK_ABORTED:
         case ERROR: {
           donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
-          LOG.debug(String.format(
-              "Task partition %s has error state %s with msg %s. Marking as 
such in rebalancer context.", pName,
-              currState, jobCtx.getPartitionInfo(pId)));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has error state %s with msg %s. Marking as 
such in rebalancer context.",
+                pName, currState, jobCtx.getPartitionInfo(pId)));
+          }
           markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition 
fails for a specified
           // maximum number of attempts or task is in ABORTED state.
@@ -376,7 +390,9 @@ public class JobRebalancer extends TaskRebalancer {
                 || currState.equals(TaskPartitionState.ERROR)) {
               skippedPartitions.add(pId);
               partitionsToDropFromIs.add(pId);
-              LOG.debug("skippedPartitions:" + skippedPartitions);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("skippedPartitions:" + skippedPartitions);
+              }
             } else {
               // Mark the task to be started at some later time (if enabled)
               markPartitionDelayed(jobCfg, jobCtx, pId);
@@ -388,9 +404,11 @@ public class JobRebalancer extends TaskRebalancer {
         case DROPPED: {
           // currState in [INIT, DROPPED]. Do nothing, the partition is 
eligible to be reassigned.
           donePartitions.add(pId);
-          LOG.debug(String.format(
-              "Task partition %s has state %s. It will be dropped from the 
current ideal state.",
-              pName, currState));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has state %s. It will be dropped from the 
current ideal state.",
+                pName, currState));
+          }
         }
           break;
         default:
@@ -501,10 +519,12 @@ public class JobRebalancer extends TaskRebalancer {
         int participantLimitation = participantCapacity - 
cache.getParticipantActiveTaskCount(instance);
         // New tasks to be assigned
         int numToAssign = Math.min(jobCfgLimitation, participantLimitation);
-        LOG.debug(String.format(
-            "Throttle tasks to be assigned to instance %s using limitation: 
Job Concurrent Task(%d), "
-                + "Participant Max Task(%d). Remaining capacity %d.", 
instance, jobCfgLimitation, participantCapacity,
-            numToAssign));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format(
+              "Throttle tasks to be assigned to instance %s using limitation: 
Job Concurrent Task(%d), "
+                  + "Participant Max Task(%d). Remaining capacity %d.", 
instance, jobCfgLimitation,
+              participantCapacity, numToAssign));
+        }
         if (numToAssign > 0) {
           Set<Integer> throttledSet = new HashSet<Integer>();
           List<Integer> nextPartitions =
@@ -516,12 +536,17 @@ public class JobRebalancer extends TaskRebalancer {
             jobCtx.setAssignedParticipant(pId, instance);
             jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
             jobCtx.setPartitionStartTime(pId, System.currentTimeMillis());
-            LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
-                TaskPartitionState.RUNNING, instance));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String
+                  .format("Setting task partition %s state to %s on instance 
%s.", pName,
+                      TaskPartitionState.RUNNING, instance));
+            }
           }
-          cache.setParticipantActiveTaskCount(instance, 
cache.getParticipantActiveTaskCount(instance) + nextPartitions.size());
+          cache.setParticipantActiveTaskCount(instance,
+              cache.getParticipantActiveTaskCount(instance) + 
nextPartitions.size());
           if (!throttledSet.isEmpty()) {
-            LOG.debug(throttledSet.size() + "tasks are ready but throttled 
when assigned to participant.");
+            LOG.debug(
+                throttledSet.size() + "tasks are ready but throttled when 
assigned to participant.");
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index e0e9903..ece1935 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -174,8 +174,10 @@ public abstract class TaskRebalancer implements 
Rebalancer, MappingCalculator {
 
     // If there is any parent job not started, this job should not be scheduled
     if (notStartedCount > 0) {
-      LOG.debug(String
-          .format("Job %s is not ready to start, notStartedParent(s)=%d.", 
job, notStartedCount));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String
+            .format("Job %s is not ready to start, notStartedParent(s)=%d.", 
job, notStartedCount));
+      }
       return false;
     }
 
@@ -188,24 +190,30 @@ public abstract class TaskRebalancer implements 
Rebalancer, MappingCalculator {
     }
     if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
       markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap);
-      LOG.debug(
-          String.format("Job %s is not ready to start, failedCount(s)=%d.", 
job, failedOrTimeoutCount));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String
+            .format("Job %s is not ready to start, failedCount(s)=%d.", job, 
failedOrTimeoutCount));
+      }
       return false;
     }
 
     if (workflowCfg.isJobQueue()) {
       // If job comes from a JobQueue, it should apply the parallel job logics
       if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
-        LOG.debug(String.format("Job %s is not ready to schedule, 
inCompleteJobs(s)=%d.", job,
-            incompleteAllCount));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Job %s is not ready to schedule, 
inCompleteJobs(s)=%d.", job,
+              incompleteAllCount));
+        }
         return false;
       }
     } else {
       // If this job comes from a generic workflow, job will not be scheduled 
until
       // all the direct parent jobs finished
       if (incompleteParentCount > 0) {
-        LOG.debug(String.format("Job %s is not ready to start, 
notFinishedParent(s)=%d.", job,
-            incompleteParentCount));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Job %s is not ready to start, 
notFinishedParent(s)=%d.", job,
+              incompleteParentCount));
+        }
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 04f4b09..540ea13 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -183,13 +183,17 @@ public class WorkflowRebalancer extends TaskRebalancer {
     for (String job : workflowCfg.getJobDag().getAllNodes()) {
       TaskState jobState = workflowCtx.getJobState(job);
       if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
-        LOG.debug("Job " + job + " is already started or completed.");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Job " + job + " is already started or completed.");
+        }
         continue;
       }
 
       if (workflowCfg.isJobQueue() && scheduledJobs >= 
workflowCfg.getParallelJobs()) {
-        LOG.debug(String.format("Workflow %s already have enough job in 
progress, "
-            + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, 
scheduledJobs));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Workflow %s already have enough job in 
progress, "
+              + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, 
scheduledJobs));
+        }
         break;
       }
 
@@ -330,7 +334,9 @@ public class WorkflowRebalancer extends TaskRebalancer {
       if (scheduleConfig.isRecurring()) {
         // Skip scheduling this workflow if it's not in a start state
         if (!workflowCfg.getTargetState().equals(TargetState.START)) {
-          LOG.debug("Skip scheduling since the workflow has not been started " 
+ workflow);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skip scheduling since the workflow has not been started 
" + workflow);
+          }
           return false;
         }
 
@@ -357,7 +363,9 @@ public class WorkflowRebalancer extends TaskRebalancer {
         DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
         String newWorkflowName = workflow + "_" + df.format(new 
Date(timeToSchedule));
-        LOG.debug("Ready to start workflow " + newWorkflowName);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ready to start workflow " + newWorkflowName);
+        }
         if (!newWorkflowName.equals(lastScheduled)) {
           Workflow clonedWf =
               cloneWorkflow(_manager, workflow, newWorkflowName, new 
Date(timeToSchedule));
@@ -555,4 +563,4 @@ public class WorkflowRebalancer extends TaskRebalancer {
     // Nothing to do here with workflow resource.
     return currentIdealState;
   }
-}
\ No newline at end of file
+}

Reply via email to