Repository: incubator-apex-core Updated Branches: refs/heads/master 752f4397d -> c2903da2f
APEXCORE-330 : Getting stack trace from the containers, exposed through REST api return value will in JSON. StackTrace can be accessed through following call. "/ws/v2/stram/physicalPlan/containers/<containersID>/stackTrace" Addressed review comments. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/ac7d673a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/ac7d673a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/ac7d673a Branch: refs/heads/master Commit: ac7d673ad89e1fda27fcf418022dbd74430326c2 Parents: cae4dc5 Author: sandeshh <sandesh.he...@gmail.com> Authored: Sun Apr 24 07:45:12 2016 -0700 Committer: sandeshh <sandesh.he...@gmail.com> Committed: Tue May 3 15:01:04 2016 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/StramUtils.java | 52 ++++++++++++++++++++ .../stram/StreamingContainerAgent.java | 9 ++++ .../stram/StreamingContainerManager.java | 5 ++ .../StreamingContainerUmbilicalProtocol.java | 3 ++ .../java/com/datatorrent/stram/cli/ApexCli.java | 26 ++++++++++ .../stram/engine/StreamingContainer.java | 14 ++++++ .../stram/webapp/StramWebServices.java | 40 +++++++++++++++ 7 files changed, 149 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StramUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramUtils.java b/engine/src/main/java/com/datatorrent/stram/StramUtils.java index 8b413bc..a931253 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/StramUtils.java @@ -21,6 +21,12 @@ package com.datatorrent.stram; import java.util.Map; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.log4j.DTLoggerFactory; @@ -35,6 +41,8 @@ import com.datatorrent.api.StreamingApplication; */ public abstract class StramUtils { + private static final Logger LOG = LoggerFactory.getLogger(StramUtils.class); + public static <T> Class<? extends T> classForName(String className, Class<T> superClass) { try { @@ -81,4 +89,48 @@ public abstract class StramUtils } } + public static JSONObject getStackTrace() + { + Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces(); + + JSONObject jsonObject = new JSONObject(); + JSONArray jsonArray = new JSONArray(); + + for (Map.Entry<Thread, StackTraceElement[]> elements : stackTraces.entrySet()) { + + JSONObject jsonThread = new JSONObject(); + + Thread thread = elements.getKey(); + + try { + + jsonThread.put("name", thread.getName()); + jsonThread.put("state", thread.getState()); + jsonThread.put("id", thread.getId()); + + JSONArray stackTraceElements = new JSONArray(); + + for (StackTraceElement stackTraceElement : elements.getValue()) { + + stackTraceElements.put(stackTraceElement.toString()); + } + + jsonThread.put("stackTraceElements", stackTraceElements); + + jsonArray.put(jsonThread); + } catch (Exception ex) { + LOG.warn("Getting stack trace for the thread " + thread.getName() + " failed."); + continue; + } + } + + try { + jsonObject.put("threads", jsonArray); + } catch (JSONException e) { + throw new RuntimeException(e); + } + + return jsonObject; + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index fc2fb17..598fea5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -96,6 +96,7 @@ public class StreamingContainerAgent } boolean shutdownRequested = false; + boolean stackTraceRequested = false; Set<PTOperator> deployOpers = Sets.newHashSet(); Set<Integer> undeployOpers = Sets.newHashSet(); @@ -468,4 +469,12 @@ public class StreamingContainerAgent return ci; } + public String getStackTrace() + { + + stackTraceRequested = true; + return containerStackTrace; + } + + public volatile String containerStackTrace = null; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 37f63b2..b12709e 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1489,6 +1489,8 @@ public class StreamingContainerManager implements PlanContext }); } + sca.containerStackTrace = heartbeat.stackTrace; + if (heartbeat.restartRequested) { LOG.error("Container {} restart request", sca.container.getExternalId()); containerStopRequests.put(sca.container.getExternalId(), sca.container.getExternalId()); @@ -1820,6 +1822,9 @@ public class StreamingContainerManager implements PlanContext } rsp.nodeRequests = requests; rsp.committedWindowId = committedWindowId; + rsp.stackTraceRequired = sca.stackTraceRequested; + sca.stackTraceRequested = false; + return rsp; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java index d01d2b6..150e3b3 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java +++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java @@ -253,6 +253,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol return stats.id; } + public String stackTrace; } /** @@ -380,6 +381,8 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol * Set when dag purges a particular windowId as it's processed by all the operators. */ public long committedWindowId = -1; + + public boolean stackTraceRequired = false; } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index 87e4bdc..67406a3 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -770,6 +770,10 @@ public class ApexCli null, new Arg[]{new Arg("operator-id"), new Arg("start-time")}, "Get tuple recording info")); + connectedCommands.put("get-container-stacktrace", new CommandSpec(new GetContainerStackTrace(), + null, + new Arg[]{new Arg("container-id")}, + "Get the stack trace for the container")); // // Logical plan change command specification starts here @@ -3366,6 +3370,28 @@ public class ApexCli } + private class GetContainerStackTrace implements Command + { + @Override + public void execute(String[] args, ConsoleReader reader) throws Exception + { + String containerLongId = getContainerLongId(args[1]); + if (containerLongId == null) { + throw new CliException("Container " + args[1] + " not found"); + } + + JSONObject response; + try { + response = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS + "/" + args[1] + "/" + StramWebServices.PATH_STACKTRACE, currentApp); + } catch (Exception ex) { + throw new CliException("Webservice call to AppMaster failed.", ex); + } + + printJson(response); + } + + } + private class GetAppPackageInfoCommand implements Command { @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 2436776..1953d7a 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -79,6 +79,7 @@ import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.util.Slice; import com.datatorrent.stram.ComponentContextPair; import com.datatorrent.stram.RecoverableRpcProxy; +import com.datatorrent.stram.StramUtils; import com.datatorrent.stram.StramUtils.YarnContainerMain; import com.datatorrent.stram.StringCodecs; import com.datatorrent.stram.api.Checkpoint; @@ -609,6 +610,7 @@ public class StreamingContainer extends YarnContainerMain long tokenLifeTime = (long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME)); long expiryTime = System.currentTimeMillis(); final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + String stackTrace = null; Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); while (iter.hasNext()) { Token<?> token = iter.next(); @@ -649,6 +651,7 @@ public class StreamingContainer extends YarnContainerMain ContainerHeartbeatResponse rsp; do { + ContainerStats stats = new ContainerStats(containerId); // gather heartbeat info for all operators for (Map.Entry<Integer, Node<?>> e : nodes.entrySet()) { @@ -690,8 +693,19 @@ public class StreamingContainer extends YarnContainerMain // heartbeat call and follow-up processing //logger.debug("Sending heartbeat for {} operators.", msg.getContainerStats().size()); msg.sentTms = System.currentTimeMillis(); + + msg.stackTrace = stackTrace; + rsp = umbilical.processHeartbeat(msg); + + if (rsp.stackTraceRequired) { + stackTrace = StramUtils.getStackTrace().toString(); + } else { + stackTrace = null; + } + processHeartbeatResponse(rsp); + if (rsp.hasPendingRequests) { logger.info("Waiting for pending request."); synchronized (this.heartbeatTrigger) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java index 1047f12..52be922 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; @@ -80,6 +81,7 @@ import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Operator; import com.datatorrent.api.StringCodec; import com.datatorrent.stram.StramAppContext; +import com.datatorrent.stram.StramUtils; import com.datatorrent.stram.StreamingContainerAgent; import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.StringCodecs; @@ -122,7 +124,11 @@ public class StramWebServices public static final String PATH_OPERATOR_CLASSES = "operatorClasses"; public static final String PATH_ALERTS = "alerts"; public static final String PATH_LOGGERS = "loggers"; + public static final String PATH_STACKTRACE = "stackTrace"; public static final long WAIT_TIME = 5000; + public static final long STACK_TRACE_WAIT_TIME = 1000; + public static final long STACK_TRACE_ATTEMPTS = 10; + //public static final String PATH_ACTION_OPERATOR_CLASSES = "actionOperatorClasses"; private StramAppContext appCtx; @@ -492,6 +498,40 @@ public class StramWebServices return new JSONObject(objectMapper.writeValueAsString(ci)); } + @GET + @Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}/" + PATH_STACKTRACE) + @Produces(MediaType.APPLICATION_JSON) + public JSONObject getContainerStackTrace(@PathParam("containerId") String containerId) throws Exception + { + init(); + + if (containerId.equals(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString()))) { + return StramUtils.getStackTrace(); + } + + StreamingContainerAgent sca = dagManager.getContainerAgent(containerId); + + if (sca == null) { + throw new NotFoundException("Container not found."); + } + + if (!sca.getContainerInfo().state.equals("ACTIVE")) { + throw new NotFoundException("Container is not active."); + } + + for (int i = 0; i < STACK_TRACE_ATTEMPTS; ++i) { + String result = sca.getStackTrace(); + + if (result != null) { + return new JSONObject(result); + } + + Thread.sleep(STACK_TRACE_WAIT_TIME); + } + + throw new TimeoutException("Not able to get the stack trace"); + } + @POST // not supported by WebAppProxyServlet, can only be called directly @Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}/kill") @Produces(MediaType.APPLICATION_JSON)