Repository: incubator-reef Updated Branches: refs/heads/master 9b971979f -> ff336f33f
[REEF-617] Enable creation of EvaluatorManager on restarted evaluators This addressed the issue by * Adding helper functions for ``EvaluatorManagerFactory`` to create EvaluatorManagers for recovered evaluators. * Recover ``EvaluatorManager` on first heartbeat of an expected Evaluator on restart. * Fix a bug in ``DriverRestartManager`` where the boolean check is reversed. JIRA: [REEF-617](https://issues.apache.org/jira/browse/REEF-617) Pull Request: This closes #422 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ff336f33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ff336f33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ff336f33 Branch: refs/heads/master Commit: ff336f33f75d783deb34fad1512928c67131b7ed Parents: 9b97197 Author: Andrew Chung <afchun...@gmail.com> Authored: Wed Aug 26 15:32:52 2015 -0700 Committer: Markus Weimer <wei...@apache.org> Committed: Thu Aug 27 11:21:00 2015 -0700 ---------------------------------------------------------------------- .../driver/restart/DriverRestartManager.java | 20 ++++- .../driver/context/ContextRepresenters.java | 16 +--- .../evaluator/EvaluatorHeartbeatHandler.java | 25 +++++- .../evaluator/EvaluatorManagerFactory.java | 81 +++++++++++++------- .../common/driver/evaluator/Evaluators.java | 2 +- .../resourcemanager/ResourceStatusHandler.java | 5 +- .../common/driver/task/TaskRepresenter.java | 3 +- .../driver/YarnDriverRuntimeRestartManager.java | 1 + 8 files changed, 104 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java index 19f2b64..52764e4 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java @@ -27,6 +27,7 @@ import org.apache.reef.exception.DriverFatalRuntimeException; import org.apache.reef.runtime.common.DriverRestartCompleted; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.wake.EventHandler; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent; import javax.inject.Inject; import java.util.*; @@ -122,6 +123,19 @@ public final class DriverRestartManager { } /** + * @return The ResourceRecoverEvent of the specified evaluator. Throws a {@link DriverFatalRuntimeException} if + * the evaluator does not exist in the set of known evaluators. + */ + public synchronized ResourceRecoverEvent getResourceRecoverEvent(final String evaluatorId) { + if (!this.restartEvaluators.contains(evaluatorId)) { + throw new DriverFatalRuntimeException("Unexpected evaluator [" + evaluatorId + "], should " + + "not have been recorded."); + } + + return this.restartEvaluators.get(evaluatorId).getResourceRecoverEvent(); + } + + /** * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run. * @return true if the evaluator has been newly recovered. */ @@ -179,9 +193,9 @@ public final class DriverRestartManager { } /** - * Signals to the {@link DriverRestartManager} that an evaluator has had its running task processed. + * Signals to the {@link DriverRestartManager} that an evaluator has had its running task or active context processed. */ - public synchronized void setEvaluatorRunningTask(final String evaluatorId) { + public synchronized void setEvaluatorProcessed(final String evaluatorId) { setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED); } @@ -193,7 +207,7 @@ public final class DriverRestartManager { } private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final String evaluatorId) { - if (this.restartEvaluators.contains(evaluatorId)) { + if (!this.restartEvaluators.contains(evaluatorId)) { return EvaluatorRestartState.NOT_EXPECTED; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java index 8abc19d..4936b9a 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java @@ -23,8 +23,6 @@ import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.restart.DriverRestartManager; -import org.apache.reef.driver.restart.EvaluatorRestartState; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; import org.apache.reef.util.Optional; @@ -45,7 +43,6 @@ public final class ContextRepresenters { private final EvaluatorMessageDispatcher messageDispatcher; private final ContextFactory contextFactory; - private final DriverRestartManager driverRestartManager; // Mutable fields @GuardedBy("this") @@ -55,11 +52,9 @@ public final class ContextRepresenters { @Inject private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher, - final ContextFactory contextFactory, - final DriverRestartManager driverRestartManager) { + final ContextFactory contextFactory) { this.messageDispatcher = messageDispatcher; this.contextFactory = contextFactory; - this.driverRestartManager = driverRestartManager; } /** @@ -215,13 +210,8 @@ public final class ContextRepresenters { Optional.of(contextStatusProto.getParentId()) : Optional.<String>empty(); final EvaluatorContext context = contextFactory.newContext(contextID, parentID); this.addContext(context); - if (driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId()) == EvaluatorRestartState.REREGISTERED) { - // when we get a recovered active context, always notify application - this.messageDispatcher.onDriverRestartContextActive(context); - } else { - if (notifyClientOnNewActiveContext) { - this.messageDispatcher.onContextActive(context); - } + if (notifyClientOnNewActiveContext) { + this.messageDispatcher.onContextActive(context); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java index ee1af9f..9011d7f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java @@ -76,8 +76,8 @@ public final class EvaluatorHeartbeatHandler if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) { LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported back to the driver after restart."); - // TODO[REEF-617]: Create EvaluatorManager, add to this.evaluators, and call onEvaluatorHeartbeatMessage(). + evaluators.put(recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage)); } else { LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already been recovered."); } @@ -86,7 +86,10 @@ public final class EvaluatorHeartbeatHandler if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPIRED) { LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has reported back to the driver after restart."); - // TODO[REEF-617]: Create EvaluatorManager, call onEvaluatorHeartbeatMessage, and close it. + + // Create the evaluator manager, analyze its heartbeat, but don't add it to the set of Evaluators. + // Immediately close it. + recoverEvaluatorManager(evaluatorId, evaluatorHeartbeatMessage).close(); return; } @@ -102,4 +105,22 @@ public final class EvaluatorHeartbeatHandler LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId); } } + + /** + * Creates an EvaluatorManager for recovered evaluator. + * {@link EvaluatorManager#onEvaluatorHeartbeatMessage(RemoteMessage)} should not + * do anything if driver restart period has expired. Expired evaluators should be immediately closed + * upon return of this function, while evaluators that have not yet expired should be recorded and added + * to the {@link Evaluators} object. + */ + private EvaluatorManager recoverEvaluatorManager( + final String evaluatorId, + final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) { + final EvaluatorManager recoveredEvaluatorManager = evaluatorManagerFactory + .getNewEvaluatorManagerForRecoveredEvaluator( + driverRestartManager.getResourceRecoverEvent(evaluatorId)); + + recoveredEvaluatorManager.onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage); + return recoveredEvaluatorManager; + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java index fb496f5..4f49fe9 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java @@ -25,10 +25,7 @@ import org.apache.reef.driver.catalog.NodeDescriptor; import org.apache.reef.driver.catalog.ResourceCatalog; import org.apache.reef.driver.evaluator.EvaluatorProcessFactory; import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl; -import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; -import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; -import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; -import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.*; import org.apache.reef.tang.Injector; import org.apache.reef.tang.exceptions.BindException; import org.apache.reef.tang.exceptions.InjectionException; @@ -58,6 +55,34 @@ public final class EvaluatorManagerFactory { this.processFactory = processFactory; } + private EvaluatorManager getNewEvaluatorManagerInstanceForResource( + final ResourceEvent resourceEvent) { + NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceEvent.getNodeId()); + + if (nodeDescriptor == null) { + final String nodeId = resourceEvent.getNodeId(); + LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", nodeId); + final String[] hostNameAndPort = nodeId.split(":"); + Validate.isTrue(hostNameAndPort.length == 2); + final NodeDescriptorEvent nodeDescriptorEvent = NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId) + .setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1])) + .setMemorySize(resourceEvent.getResourceMemory()) + .setRackName(resourceEvent.getRackName().get()).build(); + // downcasting not to change the API + ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent); + nodeDescriptor = this.resourceCatalog.getNode(nodeId); + } + final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, + resourceEvent.getResourceMemory(), resourceEvent.getVirtualCores().get(), + processFactory.newEvaluatorProcess()); + + LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceEvent.getIdentifier()); + final EvaluatorManager evaluatorManager = + getNewEvaluatorManagerInstance(resourceEvent.getIdentifier(), evaluatorDescriptor); + + return evaluatorManager; + } + /** * Helper method to create a new EvaluatorManager instance. * @@ -92,30 +117,9 @@ public final class EvaluatorManagerFactory { * @param resourceAllocationEvent * @return an EvaluatorManager for the newly allocated Evaluator. */ - public EvaluatorManager getNewEvaluatorManagerForNewlyAllocatedEvaluator( + public EvaluatorManager getNewEvaluatorManagerForNewEvaluator( final ResourceAllocationEvent resourceAllocationEvent) { - NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId()); - - if (nodeDescriptor == null) { - final String nodeId = resourceAllocationEvent.getNodeId(); - LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", nodeId); - final String[] hostNameAndPort = nodeId.split(":"); - Validate.isTrue(hostNameAndPort.length == 2); - final NodeDescriptorEvent nodeDescriptorEvent = NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId) - .setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1])) - .setMemorySize(resourceAllocationEvent.getResourceMemory()) - .setRackName(resourceAllocationEvent.getRackName().get()).build(); - // downcasting not to change the API - ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent); - nodeDescriptor = this.resourceCatalog.getNode(nodeId); - } - final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, - resourceAllocationEvent.getResourceMemory(), resourceAllocationEvent.getVirtualCores().get(), - processFactory.newEvaluatorProcess()); - - LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceAllocationEvent.getIdentifier()); - final EvaluatorManager evaluatorManager = - getNewEvaluatorManagerInstance(resourceAllocationEvent.getIdentifier(), evaluatorDescriptor); + final EvaluatorManager evaluatorManager = getNewEvaluatorManagerInstanceForResource(resourceAllocationEvent); evaluatorManager.fireEvaluatorAllocatedEvent(); return evaluatorManager; @@ -134,4 +138,27 @@ public final class EvaluatorManagerFactory { return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(), new EvaluatorDescriptorImpl(null, 128, 1, processFactory.newEvaluatorProcess())); } + + /** + * Instantiates a new EvaluatorManager for a failed evaluator during driver restart. + * Does not fire an EvaluatorAllocatedEvent. + * @param resourceStatusEvent + * @return an EvaluatorManager for the user to call fail on. + */ + public EvaluatorManager getNewEvaluatorManagerForEvaluatorFailedDuringDriverRestart( + final ResourceStatusEvent resourceStatusEvent) { + return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(), + new EvaluatorDescriptorImpl(null, 128, 1, processFactory.newEvaluatorProcess())); + } + + /** + * Instantiates a new EvaluatorManager based on a resource allocation from a recovered evaluator. + * + * @param resourceRecoverEvent + * @return an EvaluatorManager for the newly allocated Evaluator. + */ + public EvaluatorManager getNewEvaluatorManagerForRecoveredEvaluator( + final ResourceRecoverEvent resourceRecoverEvent) { + return getNewEvaluatorManagerInstanceForResource(resourceRecoverEvent); + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java index fe464d9..dd6e3ff 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java @@ -107,7 +107,7 @@ public final class Evaluators implements AutoCloseable { public synchronized void put( final EvaluatorManagerFactory evaluatorManagerFactory, final ResourceAllocationEvent evaluatorMsg) { - this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewlyAllocatedEvaluator(evaluatorMsg)); + this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg)); } /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java index 70a3cce..3b0a75a 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java @@ -57,8 +57,9 @@ public final class ResourceStatusHandler implements EventHandler<ResourceStatusE evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent); } else { if (resourceStatusEvent.getIsFromPreviousDriver().get()) { - final EvaluatorManager previousEvaluatorManager = - this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusEvent); + final EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory + .getNewEvaluatorManagerForEvaluatorFailedDuringDriverRestart(resourceStatusEvent); + previousEvaluatorManager.onResourceStatusMessage(resourceStatusEvent); } else { throw new RuntimeException( http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java index a09532b..0107878 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java @@ -88,6 +88,7 @@ public final class TaskRepresenter { throw new RuntimeException("Received a message for task " + taskStatusProto.getTaskId() + " in the TaskRepresenter for Task " + this.taskId); } + if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) { // when a recovered heartbeat is received, we will take its word for it LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.", @@ -142,7 +143,7 @@ public final class TaskRepresenter { if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) { final RunningTask runningTask = new RunningTaskImpl( this.evaluatorManager, this.taskId, this.context, this); - this.driverRestartManager.setEvaluatorRunningTask(evaluatorManager.getId()); + this.driverRestartManager.setEvaluatorProcessed(evaluatorManager.getId()); this.messageDispatcher.onDriverRestartTaskRunning(runningTask); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java index 495a777..c9a1c34 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java @@ -243,6 +243,7 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta .setState(ReefServiceProtos.State.FAILED) .setExitCode(1) .setDiagnostics("Container [" + evaluatorId + "] failed during driver restart process.") + .setIsFromPreviousDriver(true) .build()); } }