[HELIX-766] TASK: Add logging functionality in AssignableInstanceManager
In order to debug task-related inquiries and issues, we realized that it would
be very helpful if we logged there was a log recording the current quota
capacity of all AssignableInstances. This is for cases where we see jobs whose
tasks are not getting assigned so that we could quickly rule out the
possibility of bugs in quota-based scheduling.
Changelist:
1. Add a method that logs current quota profile in a JSON format with an
option flag of only displaying when there are quota types whose capacities are
full
2. Add info logs in AssignableInstanceManager
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5033785c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5033785c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5033785c
Branch: refs/heads/master
Commit: 5033785c231af363953367f65f77513911b753f5
Parents: 930a4b7
Author: Hunter Lee <[email protected]>
Authored: Fri Oct 26 19:08:02 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Fri Oct 26 19:08:02 2018 -0700
----------------------------------------------------------------------
.../controller/stages/ClusterDataCache.java | 4 +-
.../helix/task/AssignableInstanceManager.java | 55 +++++++++++++++++++-
2 files changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/5033785c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 93aea4f..da22c5e 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -233,7 +233,8 @@ public class ClusterDataCache extends AbstractDataCache {
assignableInstanceManager.buildAssignableInstances(_clusterConfig,
_taskDataCache,
_liveInstanceMap, _instanceConfigMap);
/**
- * TODO: Consider this for optimization after sufficient testing
+ * TODO: (Hunter) Consider this for optimization after fixing the
problem of quotas not being
+ * properly released for targeted tasks
* if (_existsClusterConfigChange) {
* // Update both flags since buildAssignableInstances includes
updateAssignableInstances
* _existsClusterConfigChange = false;
@@ -246,6 +247,7 @@ public class ClusterDataCache extends AbstractDataCache {
* _instanceConfigMap);
* }
**/
+ assignableInstanceManager.logQuotaProfileJSON(false);
}
_instanceMessagesCache.refresh(accessor, _liveInstanceMap);
http://git-wip-us.apache.org/repos/asf/helix/blob/5033785c/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 1c1ed69..7ab3f0d 100644
---
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -31,6 +31,9 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.assigner.AssignableInstance;
import org.apache.helix.task.assigner.TaskAssignResult;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,8 +148,8 @@ public class AssignableInstanceManager {
assignableInstance.restoreTaskAssignResult(taskId, taskConfig,
quotaType);
if (taskAssignResult.isSuccessful()) {
_taskAssignResultMap.put(taskId, taskAssignResult);
- LOG.debug("TaskAssignResult restored for taskId: {}, assigned on
instance: {}", taskId,
- assignedInstance);
+ LOG.debug("TaskAssignResult restored for taskId: {}, assigned on
instance: {}",
+ taskId, assignedInstance);
}
} else {
LOG.debug(
@@ -158,6 +161,8 @@ public class AssignableInstanceManager {
}
}
}
+ LOG.info(
+ "AssignableInstanceManager built AssignableInstances from scratch
based on contexts in TaskDataCache due to Controller switch or ClusterConfig
change.");
}
/**
@@ -224,6 +229,8 @@ public class AssignableInstanceManager {
"Non-live AssignableInstance removed for instance: {} during
updateAssignableInstances",
instanceToBeRemoved.getInstanceName());
}
+ LOG.info(
+ "AssignableInstanceManager updated AssignableInstances due to
LiveInstance/InstanceConfig change.");
}
/**
@@ -277,4 +284,48 @@ public class AssignableInstanceManager {
}
return false;
}
+
+ /*
+ * Creates a JSON-style String that shows the quota profile and logs it.
+ * TODO: Make this with an associated event ID if this becomes a performance
bottleneck
+ * @param onlyDisplayIfFull if true, this String will only contain the
profile for instances whose
+ * quota capacity is at its full to avoid cluttering up the log
+ */
+ public void logQuotaProfileJSON(boolean onlyDisplayIfFull) {
+ // Create a String to use as the log for quota status
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode instanceNode = mapper.createObjectNode();
+
+ // Loop through all instances
+ for (Map.Entry<String, AssignableInstance> instanceEntry :
_assignableInstanceMap.entrySet()) {
+ AssignableInstance assignableInstance = instanceEntry.getValue();
+ boolean capacityFull = false;
+ JsonNode resourceTypeNode = mapper.createObjectNode();
+ for (Map.Entry<String, Map<String, Integer>> capacityEntry :
assignableInstance
+ .getTotalCapacity().entrySet()) {
+ String resourceType = capacityEntry.getKey();
+ Map<String, Integer> quotaTypeMap = capacityEntry.getValue();
+ JsonNode quotaTypeNode = mapper.createObjectNode();
+ for (Map.Entry<String, Integer> typeEntry : quotaTypeMap.entrySet()) {
+ String quotaType = typeEntry.getKey();
+ int totalCapacity = typeEntry.getValue();
+ int usedCapacity =
assignableInstance.getUsedCapacity().get(resourceType).get(quotaType);
+ if (!capacityFull) {
+ capacityFull = totalCapacity <= usedCapacity;
+ }
+ String capacityString = String.format("%d/%d", usedCapacity,
totalCapacity);
+ ((ObjectNode) quotaTypeNode).put(quotaType, capacityString);
+ }
+ ((ObjectNode) resourceTypeNode).put(resourceType, quotaTypeNode);
+ }
+ // If onlyDisplayIfFull, do not add the JsonNode to the parent node
+ if (onlyDisplayIfFull && !capacityFull) {
+ continue;
+ }
+ ((ObjectNode) instanceNode).put(instanceEntry.getKey(),
resourceTypeNode);
+ }
+ if (instanceNode.size() > 0) {
+ LOG.info("Current quota capacity: {}", instanceNode.toString());
+ }
+ }
}