Wrap these high frequent called debug log statements with debug level check to reduce memory footprint.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/13fac7c7 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/13fac7c7 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/13fac7c7 Branch: refs/heads/master Commit: 13fac7c77bb570631ff93e8ff83c5cdd2bcd903f Parents: f8fcd0b Author: Lei Xia <l...@linkedin.com> Authored: Mon Apr 2 14:33:29 2018 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Mon Apr 16 11:22:40 2018 -0700 ---------------------------------------------------------------------- .../helix/common/ClusterEventBlockingQueue.java | 6 +- .../rebalancer/DelayedAutoRebalancer.java | 22 ++++-- .../strategy/AutoRebalanceStrategy.java | 4 +- .../stages/BestPossibleStateCalcStage.java | 8 +- .../stages/IntermediateStateCalcStage.java | 41 ++++++---- .../helix/manager/zk/CallbackHandler.java | 65 +++++++++------- .../helix/manager/zk/ZkCacheEventThread.java | 12 ++- .../helix/manager/zk/zookeeper/ZkClient.java | 22 ++++-- .../manager/zk/zookeeper/ZkEventThread.java | 12 ++- .../org/apache/helix/task/JobRebalancer.java | 81 +++++++++++++------- .../org/apache/helix/task/TaskRebalancer.java | 24 ++++-- .../apache/helix/task/WorkflowRebalancer.java | 20 +++-- 12 files changed, 204 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java index 437fea0..7f831a2 100644 --- a/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java +++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java @@ -59,8 +59,10 @@ public class ClusterEventBlockingQueue { */ public void put(ClusterEvent event) { _eventQueue.put(event.getEventType(), event); - LOG.debug("Putting event " + event.getEventType()); - LOG.debug("Event queue size: " + _eventQueue.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("Putting event " + event.getEventType()); + LOG.debug("Event queue size: " + _eventQueue.size()); + } } /** http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index dc1517a..a7e5f50 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -62,7 +62,9 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { IdealState cachedIdealState = getCachedIdealState(resourceName, clusterData); if (cachedIdealState != null) { - LOG.debug("Use cached IdealState for " + resourceName); + if (LOG.isDebugEnabled()) { + LOG.debug("Use cached IdealState for " + resourceName); + } return cachedIdealState; } @@ -176,7 +178,9 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveEnabledNodes, replicaCount, minActiveReplicas); - LOG.debug("newActiveMapping: " + newActiveMapping); + if (LOG.isDebugEnabled()) { + LOG.debug("newActiveMapping: " + newActiveMapping); + } } finalMapping.getListFields().putAll(userDefinedPreferenceList); @@ -258,15 +262,19 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { if (nextRebalanceTime == Long.MAX_VALUE) { long startTime = _rebalanceScheduler.removeScheduledRebalance(resourceName); - LOG.debug(String - .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime)); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime)); + } } else { long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(resourceName); if (currentScheduledTime < 0 || currentScheduledTime > nextRebalanceTime) { _rebalanceScheduler.scheduleRebalance(_manager, resourceName, nextRebalanceTime); - LOG.debug(String - .format("Set next rebalance time for resource %s at time %d\n", resourceName, - nextRebalanceTime)); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Set next rebalance time for resource %s at time %d\n", resourceName, + nextRebalanceTime)); + } } } } http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java index ed510d3..0c5c0ed 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java @@ -289,7 +289,9 @@ public class AutoRebalanceStrategy implements RebalanceStrategy { } } if (donor.capacity < donor.currentlyAssigned) { - logger.debug("Could not take partitions out of node:" + donor.id); + if (logger.isDebugEnabled()) { + logger.debug("Could not take partitions out of node:" + donor.id); + } } } } http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 1dad4f0..a465f05 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -78,8 +78,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { final Map<String, InstanceConfig> instanceConfigMap = cache.getInstanceConfigMap(); final Map<String, StateModelDefinition> stateModelDefMap = cache.getStateModelDefMap(); asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() { - @Override - public Object call() { + @Override public Object call() { try { if (clusterStatusMonitor != null) { clusterStatusMonitor @@ -289,7 +288,10 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { Rebalancer customizedRebalancer = null; String rebalancerClassName = idealState.getRebalancerClassName(); if (rebalancerClassName != null) { - logger.debug("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName); + if (logger.isDebugEnabled()) { + logger + .debug("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName); + } try { customizedRebalancer = Rebalancer.class .cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance()); http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index e4324e4..166cc63 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -444,8 +444,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { boolean throttled = false; if (throttleController.throttleforResource(rebalanceType, resourceName)) { throttled = true; - logger - .debug("Throttled on resource for " + resourceName + " " + partition.getPartitionName()); + if (logger.isDebugEnabled()) { + logger + .debug("Throttled on resource for " + resourceName + " " + partition.getPartitionName()); + } } else { // throttle if any of the instance can not handle the state transition for (String ins : allInstances) { @@ -454,9 +456,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (bestPossibleState != null && !bestPossibleState.equals(currentState)) { if (throttleController.throttleForInstance(rebalanceType, ins)) { throttled = true; - logger.debug( - "Throttled because instance " + ins + " for " + resourceName + " " + partition - .getPartitionName()); + if (logger.isDebugEnabled()) { + logger.debug("Throttled because instance " + ins + " for " + resourceName + " " + partition + .getPartitionName()); + } } } } @@ -536,20 +539,24 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { PartitionStateMap bestPossibleStateMap, PartitionStateMap intermediateStateMap) { - logger.debug("Partitions need recovery: " + recoveryPartitions - + "\nPartitions get throttled on recovery: " + recoveryThrottledPartitions); - logger.debug("Partitions need loadbalance: " + loadbalancePartitions - + "\nPartitions get throttled on load-balance: " + loadbalanceThrottledPartitions); + if (logger.isDebugEnabled()) { + logger.debug("Partitions need recovery: " + recoveryPartitions + + "\nPartitions get throttled on recovery: " + recoveryThrottledPartitions); + logger.debug("Partitions need loadbalance: " + loadbalancePartitions + + "\nPartitions get throttled on load-balance: " + loadbalanceThrottledPartitions); + } for (Partition partition : allPartitions) { - logger.debug( - partition + ": Best possible map: " + bestPossibleStateMap.getPartitionMap(partition)); - logger.debug(partition + ": Current State: " + currentStateOutput - .getCurrentStateMap(resource, partition)); - logger.debug(partition + ": Pending state: " + currentStateOutput - .getPendingMessageMap(resource, partition)); - logger.debug( - partition + ": Intermediate state: " + intermediateStateMap.getPartitionMap(partition)); + if (logger.isDebugEnabled()) { + logger.debug( + partition + ": Best possible map: " + bestPossibleStateMap.getPartitionMap(partition)); + logger.debug(partition + ": Current State: " + currentStateOutput + .getCurrentStateMap(resource, partition)); + logger.debug(partition + ": Pending state: " + currentStateOutput + .getPendingMessageMap(resource, partition)); + logger.debug( + partition + ": Intermediate state: " + intermediateStateMap.getPartitionMap(partition)); + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index c3b8a4c..22a1a46 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -458,13 +458,17 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { private void subscribeChildChange(String path, NotificationContext.Type callbackType) { if (callbackType == NotificationContext.Type.INIT || callbackType == NotificationContext.Type.CALLBACK) { - logger.info( - _manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener: " + _listener); + if (logger.isDebugEnabled()) { + logger.debug( + _manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener: " + _listener); + } _zkClient.subscribeChildChanges(path, this); } else if (callbackType == NotificationContext.Type.FINALIZE) { - logger.info( - _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener: " + _listener); - + if (logger.isDebugEnabled()) { + logger.debug( + _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener: " + + _listener); + } _zkClient.unsubscribeChildChanges(path, this); } } @@ -472,15 +476,18 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { private void subscribeDataChange(String path, NotificationContext.Type callbackType) { if (callbackType == NotificationContext.Type.INIT || callbackType == NotificationContext.Type.CALLBACK) { - logger.info( - _manager.getInstanceName() + " subscribe data-change. path: " + path + ", listener: " - + _listener); + if (logger.isDebugEnabled()) { + logger.debug( + _manager.getInstanceName() + " subscribe data-change. path: " + path + ", listener: " + + _listener); + } _zkClient.subscribeDataChanges(path, this); } else if (callbackType == NotificationContext.Type.FINALIZE) { - logger.info( - _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener: " - + _listener); - + if (logger.isDebugEnabled()) { + logger.debug( + _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener: " + + _listener); + } _zkClient.unsubscribeDataChanges(path, this); } } @@ -497,22 +504,18 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { long start = System.currentTimeMillis(); if (_eventTypes.contains(EventType.NodeDataChanged) || _eventTypes .contains(EventType.NodeCreated) || _eventTypes.contains(EventType.NodeDeleted)) { - if (logger.isDebugEnabled()) { - logger.debug("Subscribing data change listener to path:" + path); - } + logger.info("Subscribing data change listener to path:" + path + " for listener: " + _listener); subscribeDataChange(path, callbackType); } if (_eventTypes.contains(EventType.NodeChildrenChanged)) { - if (logger.isDebugEnabled()) { - logger.debug("Subscribing child change listener to path:" + path); - } + logger.info( + "Subscribing child change listener to path:" + path + " for listener: " + _listener); subscribeChildChange(path, callbackType); if (watchChild) { - if (logger.isDebugEnabled()) { - logger.debug("Subscribing data change listener to all children for path:" + path); - } - + logger.info( + "Subscribing data change listener to all children for path:" + path + " for listener: " + + _listener); try { switch (_changeType) { case CURRENT_STATE: @@ -625,14 +628,18 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { try { updateNotificationTime(System.nanoTime()); if (dataPath != null && dataPath.startsWith(_path)) { - logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath - + ", listener: " + _listener); + if (logger.isDebugEnabled()) { + logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath + + ", listener: " + _listener); + } _zkClient.unsubscribeDataChanges(dataPath, this); // only needed for bucketized parent, but OK if we don't have child-change // watch on the bucketized parent path - logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath - + ", listener: " + _listener); + if (logger.isDebugEnabled()) { + logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath + + ", listener: " + _listener); + } _zkClient.unsubscribeChildChanges(dataPath, this); // No need to invoke() since this event will handled by child-change on parent-node } @@ -647,8 +654,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { public void handleChildChange(String parentPath, List<String> currentChilds) { if (logger.isDebugEnabled()) { logger.debug( - "Data change callback: child changed, path: " + parentPath + ", current child count: " - + currentChilds.size()); + "Data change callback: child changed, path: " + parentPath + ", current child count: " + ( + currentChilds != null + ? currentChilds.size() + : 0)); } try { http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java index d04f26e..9c208d3 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java @@ -62,7 +62,9 @@ public class ZkCacheEventThread extends Thread { while (!isInterrupted()) { ZkCacheEvent zkEvent = _events.take(); int eventId = _eventId.incrementAndGet(); - LOG.debug("Delivering event #" + eventId + " " + zkEvent); + if (LOG.isDebugEnabled()) { + LOG.debug("Delivering event #" + eventId + " " + zkEvent); + } try { zkEvent.run(); } catch (InterruptedException e) { @@ -74,7 +76,9 @@ public class ZkCacheEventThread extends Thread { } catch (Throwable e) { LOG.error("Error handling event " + zkEvent, e); } - LOG.debug("Delivering event #" + eventId + " done"); + if (LOG.isDebugEnabled()) { + LOG.debug("Delivering event #" + eventId + " done"); + } } } catch (InterruptedException e) { LOG.info("Terminate ZkClient event thread."); @@ -83,7 +87,9 @@ public class ZkCacheEventThread extends Thread { public void send(ZkCacheEvent event) { if (!isInterrupted()) { - LOG.debug("New event: " + event); + if (LOG.isDebugEnabled()) { + LOG.debug("New event: " + event); + } _events.add(event); } } http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java index edf1cd1..0aa6587 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java @@ -189,9 +189,11 @@ public class ZkClient implements Watcher { IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled); listenerEntries.add(listenerEntry); if (prefetchEnabled) { - LOG.debug( - "Subscribed data changes for " + path + ", listener: " + listener + ", prefetch data: " - + prefetchEnabled); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Subscribed data changes for " + path + ", listener: " + listener + ", prefetch data: " + + prefetchEnabled); + } } } watchForData(path); @@ -618,8 +620,10 @@ public class ZkClient implements Watcher { if (event.getType() == Event.EventType.NodeDeleted) { - String path = event.getPath(); - LOG.debug(path); + if (LOG.isDebugEnabled()) { + String path = event.getPath(); + LOG.debug(path); + } } getEventLock().lock(); @@ -665,7 +669,9 @@ public class ZkClient implements Watcher { // update state change counter. recordStateChange(stateChanged, dataChanged); - LOG.debug("Leaving process event"); + if (LOG.isDebugEnabled()) { + LOG.debug("Leaving process event"); + } } } @@ -914,7 +920,9 @@ public class ZkClient implements Watcher { try { Object data = null; if (listener.isPrefetchData()) { - LOG.debug("Prefetch data for path: " + path); + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch data for path: " + path); + } data = readData(path, null, true); } listener.getDataListener().handleDataChange(path, data); http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java index 21d5400..a16f27c 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java @@ -62,7 +62,9 @@ public class ZkEventThread extends Thread { while (!isInterrupted()) { ZkEvent zkEvent = _events.take(); int eventId = _eventId.incrementAndGet(); - LOG.debug("Delivering event #" + eventId + " " + zkEvent); + if (LOG.isDebugEnabled()) { + LOG.debug("Delivering event #" + eventId + " " + zkEvent); + } try { zkEvent.run(); _totalEventCountHandled ++; @@ -73,7 +75,9 @@ public class ZkEventThread extends Thread { } catch (Throwable e) { LOG.error("Error handling event " + zkEvent, e); } - LOG.debug("Delivering event #" + eventId + " done"); + if (LOG.isDebugEnabled()) { + LOG.debug("Delivering event #" + eventId + " done"); + } } } catch (InterruptedException e) { LOG.info("Terminate ZkClient event thread."); @@ -82,7 +86,9 @@ public class ZkEventThread extends Thread { public void send(ZkEvent event) { if (!isInterrupted()) { - LOG.debug("New event: " + event); + if (LOG.isDebugEnabled()) { + LOG.debug("New event: " + event); + } _events.add(event); _totalEventCount ++; } http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index d00ffaf..1620238 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -271,8 +271,11 @@ public class JobRebalancer extends TaskRebalancer { getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment, allPartitions); long currentTime = System.currentTimeMillis(); - LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + prevInstanceToTaskAssignments - + " excludedInstances: " + excludedInstances); + if (LOG.isDebugEnabled()) { + LOG.debug( + "All partitions: " + allPartitions + " taskAssignment: " + prevInstanceToTaskAssignments + + " excludedInstances: " + excludedInstances); + } // Iterate through all instances for (String instance : prevInstanceToTaskAssignments.keySet()) { @@ -311,9 +314,11 @@ public class JobRebalancer extends TaskRebalancer { paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); assignedPartitions.add(pId); - LOG.debug(String.format( - "Instance %s requested a state transition to %s for partition %s.", instance, - requestedState, pName)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Instance %s requested a state transition to %s for partition %s.", instance, + requestedState, pName)); + } continue; } @@ -328,8 +333,11 @@ public class JobRebalancer extends TaskRebalancer { paMap.put(pId, new PartitionAssignment(instance, nextState.name())); assignedPartitions.add(pId); - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, - nextState, instance)); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Setting task partition %s state to %s on instance %s.", pName, nextState, + instance)); + } } break; case STOPPED: { @@ -342,17 +350,21 @@ public class JobRebalancer extends TaskRebalancer { paMap.put(pId, new PartitionAssignment(instance, nextState.name())); assignedPartitions.add(pId); - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, - nextState, instance)); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Setting task partition %s state to %s on instance %s.", pName, nextState, + instance)); + } } break; case COMPLETED: { // The task has completed on this partition. Mark as such in the context object. donePartitions.add(pId); - LOG.debug(String - .format( - "Task partition %s has completed with state %s. Marking as such in rebalancer context.", - pName, currState)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has completed with state %s. Marking as such in rebalancer context.", + pName, currState)); + } partitionsToDropFromIs.add(pId); markPartitionCompleted(jobCtx, pId); } @@ -362,9 +374,11 @@ public class JobRebalancer extends TaskRebalancer { case TASK_ABORTED: case ERROR: { donePartitions.add(pId); // The task may be rescheduled on a different instance. - LOG.debug(String.format( - "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", pName, - currState, jobCtx.getPartitionInfo(pId))); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", + pName, currState, jobCtx.getPartitionInfo(pId))); + } markPartitionError(jobCtx, pId, currState, true); // The error policy is to fail the task as soon a single partition fails for a specified // maximum number of attempts or task is in ABORTED state. @@ -376,7 +390,9 @@ public class JobRebalancer extends TaskRebalancer { || currState.equals(TaskPartitionState.ERROR)) { skippedPartitions.add(pId); partitionsToDropFromIs.add(pId); - LOG.debug("skippedPartitions:" + skippedPartitions); + if (LOG.isDebugEnabled()) { + LOG.debug("skippedPartitions:" + skippedPartitions); + } } else { // Mark the task to be started at some later time (if enabled) markPartitionDelayed(jobCfg, jobCtx, pId); @@ -388,9 +404,11 @@ public class JobRebalancer extends TaskRebalancer { case DROPPED: { // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned. donePartitions.add(pId); - LOG.debug(String.format( - "Task partition %s has state %s. It will be dropped from the current ideal state.", - pName, currState)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has state %s. It will be dropped from the current ideal state.", + pName, currState)); + } } break; default: @@ -501,10 +519,12 @@ public class JobRebalancer extends TaskRebalancer { int participantLimitation = participantCapacity - cache.getParticipantActiveTaskCount(instance); // New tasks to be assigned int numToAssign = Math.min(jobCfgLimitation, participantLimitation); - LOG.debug(String.format( - "Throttle tasks to be assigned to instance %s using limitation: Job Concurrent Task(%d), " - + "Participant Max Task(%d). Remaining capacity %d.", instance, jobCfgLimitation, participantCapacity, - numToAssign)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Throttle tasks to be assigned to instance %s using limitation: Job Concurrent Task(%d), " + + "Participant Max Task(%d). Remaining capacity %d.", instance, jobCfgLimitation, + participantCapacity, numToAssign)); + } if (numToAssign > 0) { Set<Integer> throttledSet = new HashSet<Integer>(); List<Integer> nextPartitions = @@ -516,12 +536,17 @@ public class JobRebalancer extends TaskRebalancer { jobCtx.setAssignedParticipant(pId, instance); jobCtx.setPartitionState(pId, TaskPartitionState.INIT); jobCtx.setPartitionStartTime(pId, System.currentTimeMillis()); - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, - TaskPartitionState.RUNNING, instance)); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Setting task partition %s state to %s on instance %s.", pName, + TaskPartitionState.RUNNING, instance)); + } } - cache.setParticipantActiveTaskCount(instance, cache.getParticipantActiveTaskCount(instance) + nextPartitions.size()); + cache.setParticipantActiveTaskCount(instance, + cache.getParticipantActiveTaskCount(instance) + nextPartitions.size()); if (!throttledSet.isEmpty()) { - LOG.debug(throttledSet.size() + "tasks are ready but throttled when assigned to participant."); + LOG.debug( + throttledSet.size() + "tasks are ready but throttled when assigned to participant."); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index e0e9903..ece1935 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -174,8 +174,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // If there is any parent job not started, this job should not be scheduled if (notStartedCount > 0) { - LOG.debug(String - .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount)); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount)); + } return false; } @@ -188,24 +190,30 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) { markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap); - LOG.debug( - String.format("Job %s is not ready to start, failedCount(s)=%d.", job, failedOrTimeoutCount)); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Job %s is not ready to start, failedCount(s)=%d.", job, failedOrTimeoutCount)); + } return false; } if (workflowCfg.isJobQueue()) { // If job comes from a JobQueue, it should apply the parallel job logics if (incompleteAllCount >= workflowCfg.getParallelJobs()) { - LOG.debug(String.format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job, - incompleteAllCount)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job, + incompleteAllCount)); + } return false; } } else { // If this job comes from a generic workflow, job will not be scheduled until // all the direct parent jobs finished if (incompleteParentCount > 0) { - LOG.debug(String.format("Job %s is not ready to start, notFinishedParent(s)=%d.", job, - incompleteParentCount)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Job %s is not ready to start, notFinishedParent(s)=%d.", job, + incompleteParentCount)); + } return false; } } http://git-wip-us.apache.org/repos/asf/helix/blob/13fac7c7/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 04f4b09..540ea13 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -183,13 +183,17 @@ public class WorkflowRebalancer extends TaskRebalancer { for (String job : workflowCfg.getJobDag().getAllNodes()) { TaskState jobState = workflowCtx.getJobState(job); if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) { - LOG.debug("Job " + job + " is already started or completed."); + if (LOG.isDebugEnabled()) { + LOG.debug("Job " + job + " is already started or completed."); + } continue; } if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) { - LOG.debug(String.format("Workflow %s already have enough job in progress, " - + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Workflow %s already have enough job in progress, " + + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs)); + } break; } @@ -330,7 +334,9 @@ public class WorkflowRebalancer extends TaskRebalancer { if (scheduleConfig.isRecurring()) { // Skip scheduling this workflow if it's not in a start state if (!workflowCfg.getTargetState().equals(TargetState.START)) { - LOG.debug("Skip scheduling since the workflow has not been started " + workflow); + if (LOG.isDebugEnabled()) { + LOG.debug("Skip scheduling since the workflow has not been started " + workflow); + } return false; } @@ -357,7 +363,9 @@ public class WorkflowRebalancer extends TaskRebalancer { DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss"); df.setTimeZone(TimeZone.getTimeZone("UTC")); String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule)); - LOG.debug("Ready to start workflow " + newWorkflowName); + if (LOG.isDebugEnabled()) { + LOG.debug("Ready to start workflow " + newWorkflowName); + } if (!newWorkflowName.equals(lastScheduled)) { Workflow clonedWf = cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule)); @@ -555,4 +563,4 @@ public class WorkflowRebalancer extends TaskRebalancer { // Nothing to do here with workflow resource. return currentIdealState; } -} \ No newline at end of file +}