Repository: apex-core Updated Branches: refs/heads/master 527c70bf8 -> f04e07c03
APEXCORE-294 controlled shutdown of an application. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/51076c6c Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/51076c6c Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/51076c6c Branch: refs/heads/master Commit: 51076c6cb4e5abe333b207b5c5a821e683a8bfb0 Parents: 1e9896b Author: Tushar R. Gosavi <[email protected]> Authored: Mon Jan 23 16:59:54 2017 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Thu Feb 2 11:47:20 2017 +0530 ---------------------------------------------------------------------- docs/apex_cli.md | 10 +++++++ .../datatorrent/stram/StramLocalCluster.java | 4 +-- .../stram/StreamingContainerAgent.java | 13 ++++++++- .../stram/StreamingContainerManager.java | 25 ++++++++--------- .../StreamingContainerUmbilicalProtocol.java | 20 +++++++++++++- .../java/com/datatorrent/stram/engine/Node.java | 20 +++++++++++--- .../stram/engine/StreamingContainer.java | 28 ++++++++++++++++---- .../stram/webapp/StramWebServices.java | 3 ++- 8 files changed, 98 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/docs/apex_cli.md ---------------------------------------------------------------------- diff --git a/docs/apex_cli.md b/docs/apex_cli.md index da62a82..97d082f 100644 --- a/docs/apex_cli.md +++ b/docs/apex_cli.md @@ -266,3 +266,13 @@ submit *Note*: To perform runtime logical plan changes, like ability to add new operators, they must be part of the jar files that were deployed at application launch time. + +*kill-app* terminates operators in undefined order, causing some operators to not process +data emitted by input operators. Such application can be restarted from last checkpointed +state using `launch -originalAppId` command. As application is relaunched from checkpointed +state there will be no data loss. + +*shutdown-app* This command is to terminate the application while making sure that all data +emitted by input operators are processed throughout the DAG. Application terminated with +`shutdown-app` command can not be restarted, as application is considered to be completed +successfully. http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index e188b60..2ffbabd 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -135,7 +135,7 @@ public class StramLocalCluster implements Runnable, Controller { if (injectShutdown.containsKey(msg.getContainerId())) { ContainerHeartbeatResponse r = new ContainerHeartbeatResponse(); - r.shutdown = true; + r.shutdown = ShutdownType.ABORT; return r; } try { @@ -466,7 +466,7 @@ public class StramLocalCluster implements Runnable, Controller StreamingContainer c = childContainers.get(containerIdStr); if (c != null) { ContainerHeartbeatResponse r = new ContainerHeartbeatResponse(); - r.shutdown = true; + r.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT; c.processHeartbeatResponse(r); } dnmgr.containerStopRequests.remove(containerIdStr); http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 2ea37f4..1d0897d 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -51,6 +51,7 @@ import com.datatorrent.stram.api.OperatorDeployInfo.InputDeployInfo; import com.datatorrent.stram.api.OperatorDeployInfo.OperatorType; import com.datatorrent.stram.api.OperatorDeployInfo.OutputDeployInfo; import com.datatorrent.stram.api.OperatorDeployInfo.UnifierDeployInfo; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ShutdownType; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext; import com.datatorrent.stram.engine.OperatorContext; @@ -96,7 +97,7 @@ public class StreamingContainerAgent this.dnmgr = dnmgr; } - boolean shutdownRequested = false; + ShutdownType shutdownRequest = null; boolean stackTraceRequested = false; Set<PTOperator> deployOpers = Sets.newHashSet(); @@ -484,4 +485,14 @@ public class StreamingContainerAgent } public volatile String containerStackTrace = null; + + public void requestShutDown(ShutdownType type) + { + shutdownRequest = type; + } + + public boolean isShutdownRequested() + { + return (shutdownRequest != null); + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 45bfcdb..00a406c 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -134,6 +134,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHe import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ShutdownType; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext; import com.datatorrent.stram.engine.OperatorResponse; @@ -761,7 +762,7 @@ public class StreamingContainerManager implements PlanContext for (PTContainer c : pendingAllocation) { LOG.warn("Waiting for resource: {}m priority: {} {}", c.getRequiredMemoryMB(), c.getResourceRequestPriority(), c); } - shutdownAllContainers(msg); + shutdownAllContainers(ShutdownType.ABORT, msg); this.forcedShutdown = true; } else { for (PTContainer c : pendingAllocation) { @@ -1121,7 +1122,7 @@ public class StreamingContainerManager implements PlanContext public void scheduleContainerRestart(String containerId) { StreamingContainerAgent cs = this.getContainerAgent(containerId); - if (cs == null || cs.shutdownRequested) { + if (cs == null || cs.isShutdownRequested()) { // the container is no longer used / was released by us return; } @@ -1428,7 +1429,7 @@ public class StreamingContainerManager implements PlanContext } else { String msg = String.format("Shutdown after reaching failure threshold for %s", oper); LOG.warn(msg); - shutdownAllContainers(msg); + shutdownAllContainers(ShutdownType.ABORT, msg); forcedShutdown = true; } } else { @@ -1454,7 +1455,7 @@ public class StreamingContainerManager implements PlanContext // could be orphaned container that was replaced and needs to terminate LOG.error("Unknown container {}", heartbeat.getContainerId()); ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(); - response.shutdown = true; + response.shutdown = ShutdownType.ABORT; return response; } @@ -1771,11 +1772,11 @@ public class StreamingContainerManager implements PlanContext if (heartbeat.getContainerStats().operators.isEmpty() && isApplicationIdle()) { LOG.info("requesting idle shutdown for container {}", heartbeat.getContainerId()); - rsp.shutdown = true; + rsp.shutdown = ShutdownType.ABORT; } else { - if (sca.shutdownRequested) { + if (sca.isShutdownRequested()) { LOG.info("requesting shutdown for container {}", heartbeat.getContainerId()); - rsp.shutdown = true; + rsp.shutdown = sca.shutdownRequest; } } @@ -2191,7 +2192,6 @@ public class StreamingContainerManager implements PlanContext int operatorCount = 0; UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery, getCheckpointGroups()); for (OperatorMeta logicalOperator : plan.getLogicalPlan().getRootOperators()) { - //LOG.debug("Updating checkpoints for operator {}", logicalOperator.getName()); List<PTOperator> operators = plan.getOperators(logicalOperator); if (operators != null) { for (PTOperator operator : operators) { @@ -2257,14 +2257,15 @@ public class StreamingContainerManager implements PlanContext * If containers don't respond, the application can be forcefully terminated * via yarn using forceKillApplication. * + * @param type * @param message */ - public void shutdownAllContainers(String message) + public void shutdownAllContainers(ShutdownType type, String message) { this.shutdownDiagnosticsMessage = message; - LOG.info("Initiating application shutdown: {}", message); + LOG.info("Initiating application shutdown: type {} {}", type, message); for (StreamingContainerAgent cs : this.containers.values()) { - cs.shutdownRequested = true; + cs.requestShutDown(type); } } @@ -2372,7 +2373,7 @@ public class StreamingContainerManager implements PlanContext LOG.debug("Container marked for shutdown: {}", c); // container already removed from plan // TODO: monitor soft shutdown - sca.shutdownRequested = true; + sca.requestShutDown(ShutdownType.ABORT); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java index 150e3b3..77a33e6 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java +++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java @@ -341,6 +341,24 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol } } + enum ShutdownType + { + /** + * WAIT_TERMINATE, when this value is set all input operators in the + * container terminates and send END_STREAM tuple to downstream + * operators. This type of shutdown make sure that data emitted by upstream + * operator before shutdown are processed by downstreams operator before + * they terminate. + */ + WAIT_TERMINATE, + /** + * ABORT, In few cases we need to shutdown container forcefully such + * as resource allocation timeout, stale container or container without any operator. + * In such cases this flag will be used to send shutdown request to the container. + */ + ABORT, + } + /** * * Response from the stram to the container heartbeat @@ -354,7 +372,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol /** * Indicate container to exit heartbeat loop and shutdown. */ - public boolean shutdown; + public ShutdownType shutdown; /** * Optional list of responses for operators in the container. http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index 4a5cbde..d779afe 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -317,9 +317,18 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera protected ProcessingMode PROCESSING_MODE; protected volatile boolean shutdown; - public void shutdown() + /** + * Shutdown the current node. + * If processEndWindow is set to true, then after shutting down, endWindow is called and + * END_STREAM tuple is sent to downstream operators. + * + * @param processEndWindow + */ + public void shutdown(boolean processEndWindow) { - shutdown = true; + if (!processEndWindow) { + shutdown = true; + } synchronized (this) { alive = false; @@ -345,6 +354,11 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera } } + public void shutdown() + { + shutdown(false); + } + @Override public String toString() { @@ -353,7 +367,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera protected void emitEndStream() { - // logger.debug("{} sending EndOfStream", this); + logger.info("{} sending EndOfStream", this); /* * since we are going away, we should let all the downstream operators know that. */ http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 86c0402..c3886b4 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -807,11 +807,15 @@ public class StreamingContainer extends YarnContainerMain undeploy(rsp.undeployRequest); } - if (rsp.shutdown) { - logger.info("Received shutdown request"); - processNodeRequests(false); - this.exitHeartbeatLoop = true; - return; + if (rsp.shutdown != null) { + logger.info("Received shutdown request type {}", rsp.shutdown); + if (rsp.shutdown == StreamingContainerUmbilicalProtocol.ShutdownType.ABORT) { + processNodeRequests(false); + this.exitHeartbeatLoop = true; + return; + } else if (rsp.shutdown == StreamingContainerUmbilicalProtocol.ShutdownType.WAIT_TERMINATE) { + stopInputNodes(); + } } if (rsp.deployRequest != null) { @@ -833,6 +837,20 @@ public class StreamingContainer extends YarnContainerMain processNodeRequests(true); } + private void stopInputNodes() + { + for (Entry<Integer, Node<?>> e : nodes.entrySet()) { + Node<?> node = e.getValue(); + if (node instanceof InputNode) { + final Thread thread = e.getValue().context.getThread(); + if (thread == null || !thread.isAlive()) { + continue; + } + } + node.shutdown(true); + } + } + private int getOutputQueueCapacity(List<OperatorDeployInfo> operatorList, int sourceOperatorId, String sourcePortName) { for (OperatorDeployInfo odi : operatorList) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/51076c6c/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java index 16b7ed7..995127c 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java @@ -84,6 +84,7 @@ import com.datatorrent.stram.StramUtils; import com.datatorrent.stram.StreamingContainerAgent; import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.StringCodecs; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; import com.datatorrent.stram.codec.LogicalPlanSerializer; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta; @@ -361,7 +362,7 @@ public class StramWebServices { init(); LOG.debug("Shutdown requested"); - dagManager.shutdownAllContainers("Shutdown requested externally."); + dagManager.shutdownAllContainers(StreamingContainerUmbilicalProtocol.ShutdownType.WAIT_TERMINATE, "Shutdown requested externally."); return new JSONObject(); }
