Repository: helix
Updated Branches:
  refs/heads/master a6863937c -> 53a6791e7


Log improvements


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c3297ae4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c3297ae4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c3297ae4

Branch: refs/heads/master
Commit: c3297ae435cabac6c650ad649e2b72b48ea16dc7
Parents: a686393
Author: Junkai Xue <[email protected]>
Authored: Fri Sep 7 12:15:03 2018 -0700
Committer: Junkai Xue <[email protected]>
Committed: Mon Oct 29 14:29:21 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  9 ++--
 .../controller/stages/ClusterDataCache.java     | 32 +++++---------
 .../stages/MessageGenerationPhase.java          | 44 ++++++++++----------
 3 files changed, 39 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c3297ae4/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6a8fedd..6b1244b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -199,7 +199,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
         forceRebalance(_manager, _clusterEventType);
       } catch (Throwable ex) {
         logger.error("Time task failed. Rebalance task type: " + 
_clusterEventType + ", cluster: "
-            + _clusterName);
+            + _clusterName, ex);
       }
     }
   }
@@ -208,7 +208,8 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   private void forceRebalance(HelixManager manager, ClusterEventType 
eventType) {
     NotificationContext changeContext = new NotificationContext(manager);
     changeContext.setType(NotificationContext.Type.CALLBACK);
-    ClusterEvent event = new ClusterEvent(_clusterName, eventType);
+    String uid = UUID.randomUUID().toString().substring(0, 8);
+    ClusterEvent event = new ClusterEvent(_clusterName, eventType, uid);
     event.addAttribute(AttributeName.helixmanager.name(), 
changeContext.getManager());
     event.addAttribute(AttributeName.changeContext.name(), changeContext);
     event.addAttribute(AttributeName.eventData.name(), new ArrayList<>());
@@ -217,7 +218,9 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     _taskEventQueue.put(event);
     _eventQueue.put(event);
 
-    logger.info("Controller rebalance event triggered with event type: " + 
eventType);
+    logger.info(String
+        .format("Controller rebalance event triggered with event type: %s for 
cluster %s",
+            eventType, _clusterName));
   }
 
   // TODO who should stop this timer

http://git-wip-us.apache.org/repos/asf/helix/blob/c3297ae4/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 2299f92..e1a374e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -181,8 +181,9 @@ public class ClusterDataCache extends AbstractDataCache {
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, false);
       clearCachedResourceAssignments();
       _instanceConfigCacheMap = 
accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
-      LogUtil.logInfo(LOG, _eventId, "Reload InstanceConfig: " + 
_instanceConfigCacheMap.keySet()
-          + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
+      LogUtil.logInfo(LOG, _eventId,
+          "Reload InstanceConfig for cluster " + _clusterName + " : " + 
_instanceConfigCacheMap
+              .keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + 
"pipeline");
     }
 
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
@@ -190,8 +191,9 @@ public class ClusterDataCache extends AbstractDataCache {
       clearCachedResourceAssignments();
 
       _resourceConfigCacheMap = refreshResourceConfigs(accessor);
-      LogUtil.logInfo(LOG, _eventId, "Reload ResourceConfigs: " + 
_resourceConfigCacheMap.keySet()
-          + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
+      LogUtil.logInfo(LOG, _eventId,
+          "Reload ResourceConfigs for cluster " + _clusterName + " : " + 
_resourceConfigCacheMap
+              .keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + 
"pipeline");
     }
 
     // This is for targeted jobs' task assignment. It needs to watch for 
current state changes for
@@ -230,23 +232,11 @@ public class ClusterDataCache extends AbstractDataCache {
       AssignableInstanceManager assignableInstanceManager =
           _taskDataCache.getAssignableInstanceManager();
       // Build from scratch every time
-      assignableInstanceManager.buildAssignableInstances(_clusterConfig, 
_taskDataCache,
-          _liveInstanceMap, _instanceConfigMap);
-      /**
-       * TODO: (Hunter) Consider this for optimization after fixing the 
problem of quotas not being
-       * properly released for targeted tasks
-       * if (_existsClusterConfigChange) {
-       * // Update both flags since buildAssignableInstances includes 
updateAssignableInstances
-       * _existsClusterConfigChange = false;
-       * _existsInstanceChange = false;
-       * assignableInstanceManager.buildAssignableInstances(_clusterConfig, 
_taskDataCache,
-       * _liveInstanceMap, _instanceConfigMap);
-       * } else if (_existsInstanceChange) {
-       * _existsInstanceChange = false;
-       * assignableInstanceManager.updateAssignableInstances(_clusterConfig, 
_liveInstanceMap,
-       * _instanceConfigMap);
-       * }
-       **/
+      assignableInstanceManager
+          .buildAssignableInstances(_clusterConfig, _taskDataCache, 
_liveInstanceMap,
+              _instanceConfigMap);
+      // TODO: (Hunter) Consider this for optimization after fixing the 
problem of quotas not being
+
       assignableInstanceManager.logQuotaProfileJSON(false);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c3297ae4/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index cc098f6..ae0c93f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -99,8 +99,8 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
       }
 
       for (Partition partition : resource.getPartitions()) {
-        Map<String, String> instanceStateMap = new HashMap<>(
-            resourcesStateMap.getInstanceStateMap(resourceName, partition));
+        Map<String, String> instanceStateMap =
+            new HashMap<>(resourcesStateMap.getInstanceStateMap(resourceName, 
partition));
         Map<String, String> pendingStateMap =
             currentStateOutput.getPendingStateMap(resourceName, partition);
 
@@ -114,7 +114,6 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
           }
         }
 
-
         // we should generate message based on the desired-state priority
         // so keep generated messages in a temp map keyed by state
         // desired-state->list of generated-messages
@@ -123,17 +122,17 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
         for (String instanceName : instanceStateMap.keySet()) {
           String desiredState = instanceStateMap.get(instanceName);
 
-          String currentState = 
currentStateOutput.getCurrentState(resourceName, partition,
-              instanceName);
+          String currentState =
+              currentStateOutput.getCurrentState(resourceName, partition, 
instanceName);
           if (currentState == null) {
             currentState = stateModelDef.getInitialState();
           }
 
-          Message pendingMessage = 
currentStateOutput.getPendingMessage(resourceName, partition,
-              instanceName);
+          Message pendingMessage =
+              currentStateOutput.getPendingMessage(resourceName, partition, 
instanceName);
           boolean isCancellationEnabled = 
cache.getClusterConfig().isStateTransitionCancelEnabled();
-          Message cancellationMessage = 
currentStateOutput.getCancellationMessage(resourceName,
-              partition, instanceName);
+          Message cancellationMessage =
+              currentStateOutput.getCancellationMessage(resourceName, 
partition, instanceName);
           String nextState = 
stateModelDef.getNextStateForTransition(currentState, desiredState);
 
           Message message = null;
@@ -151,7 +150,8 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
                 .put(pendingMessage.getMsgId(), pendingMessage);
           }
 
-          if (desiredState.equals(NO_DESIRED_STATE) || 
desiredState.equalsIgnoreCase(currentState)) {
+          if (desiredState.equals(NO_DESIRED_STATE) || desiredState
+              .equalsIgnoreCase(currentState)) {
             if (desiredState.equals(NO_DESIRED_STATE) || pendingMessage != 
null && !currentState
                 .equalsIgnoreCase(pendingMessage.getToState())) {
               message = createStateTransitionCancellationMessage(manager, 
resource,
@@ -202,7 +202,8 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
               if (logger.isDebugEnabled()) {
                 LogUtil.logDebug(logger, _eventId, String.format(
                     "Resource %s partition %s for instance %s with 
currentState %s and nextState %s",
-                    resource, partition.getPartitionName(), instanceName, 
currentState, nextState));
+                    resource.getResourceName(), partition.getPartitionName(), 
instanceName,
+                    currentState, nextState));
               }
             }
           }
@@ -256,19 +257,18 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
 
   /**
    * Start a job in worker pool that asynchronously clean up pending message. 
Since it is possible
-   * that participant failed to clean up message after processing, it is 
important for controller
-   * to try to clean them up as well to unblock further rebalance
+   * that participant failed to clean up message after processing, it is 
important for controller to
+   * try to clean them up as well to unblock further rebalance
    *
    * @param pendingMessagesToPurge key: instance name, value: list of pending 
message to cleanup
-   * @param workerPool ExecutorService that job can be submitted to
-   * @param accessor Data accessor used to clean up message
+   * @param workerPool             ExecutorService that job can be submitted to
+   * @param accessor               Data accessor used to clean up message
    */
   private void schedulePendingMessageCleanUp(
       final Map<String, Map<String, Message>> pendingMessagesToPurge, 
ExecutorService workerPool,
       final HelixDataAccessor accessor) {
     workerPool.submit(new Callable<Object>() {
-      @Override
-      public Object call() {
+      @Override public Object call() {
         for (Map.Entry<String, Map<String, Message>> entry : 
pendingMessagesToPurge.entrySet()) {
           String instanceName = entry.getKey();
           for (Message msg : entry.getValue().values()) {
@@ -302,9 +302,9 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
     }
   }
 
-  private Message createStateTransitionMessage(HelixManager manager, Resource 
resource, String partitionName,
-      String instanceName, String currentState, String nextState, String 
sessionId,
-      String stateModelDefName) {
+  private Message createStateTransitionMessage(HelixManager manager, Resource 
resource,
+      String partitionName, String instanceName, String currentState, String 
nextState,
+      String sessionId, String stateModelDefName) {
     String uuid = UUID.randomUUID().toString();
     Message message = new Message(MessageType.STATE_TRANSITION, uuid);
     message.setSrcName(manager.getInstanceName());
@@ -332,8 +332,8 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
 
   private Message createStateTransitionCancellationMessage(HelixManager 
manager, Resource resource,
       String partitionName, String instanceName, String sessionId, String 
stateModelDefName,
-      String fromState, String toState, String nextState, Message 
cancellationMessage, boolean isCancellationEnabled,
-      String currentState) {
+      String fromState, String toState, String nextState, Message 
cancellationMessage,
+      boolean isCancellationEnabled, String currentState) {
 
     if (isCancellationEnabled && cancellationMessage == null) {
       LogUtil.logInfo(logger, _eventId,

Reply via email to