Repository: reef Updated Branches: refs/heads/master a9d6d6a66 -> 719e64459
[REEF-1557] Refactor State, EvaluatorState, and related clases. * Move all state-checking logic into corresponding enums. * Make state atomic in EvaluatorStatusManager. * Add javadocs and cleanup the code. * Refactor EvaluatorManager for readability. JIRA: [REEF-1557](https://issues.apache.org/jira/browse/REEF-1557) Pull request: This closes #1118 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/719e6445 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/719e6445 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/719e6445 Branch: refs/heads/master Commit: 719e6445951b0214c1b687ac85def666e5bc2c4a Parents: a9d6d6a Author: Sergiy Matusevych <[email protected]> Authored: Fri Sep 9 11:43:02 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Fri Sep 16 15:31:13 2016 -0700 ---------------------------------------------------------------------- .../driver/restart/EvaluatorRestartInfo.java | 23 +- .../driver/restart/EvaluatorRestartState.java | 37 ++- .../runtime/common/driver/DriverStatus.java | 54 ++++- .../common/driver/DriverStatusManager.java | 77 +++--- .../driver/evaluator/EvaluatorManager.java | 240 +++++++++++-------- .../common/driver/evaluator/EvaluatorState.java | 156 +++++++++++- .../evaluator/EvaluatorStatusManager.java | 218 ++++++++--------- .../evaluator/pojos/EvaluatorStatusPOJO.java | 48 +--- .../common/driver/evaluator/pojos/State.java | 121 +++++++++- .../driver/evaluator/pojos/TaskStatusPOJO.java | 70 ++---- .../common/driver/idle/DriverIdleManager.java | 2 +- .../resourcemanager/ResourceManagerStatus.java | 72 +----- 12 files changed, 676 insertions(+), 442 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java index 8f35af5..27baa61 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java @@ -33,7 +33,9 @@ import org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEven @DriverSide @Unstable public final class EvaluatorRestartInfo { + private final ResourceRecoverEvent resourceRecoverEvent; + private EvaluatorRestartState evaluatorRestartState; /** @@ -44,11 +46,19 @@ public final class EvaluatorRestartInfo { return new EvaluatorRestartInfo(resourceRecoverEvent, EvaluatorRestartState.EXPECTED); } + private EvaluatorRestartInfo( + final ResourceRecoverEvent resourceRecoverEvent, final EvaluatorRestartState evaluatorRestartState) { + + this.resourceRecoverEvent = resourceRecoverEvent; + this.evaluatorRestartState = evaluatorRestartState; + } + /** * Creates an {@link EvaluatorRestartInfo} object that represents the information of an evaluator that * has failed on driver restart. */ public static EvaluatorRestartInfo createFailedEvaluatorInfo(final String evaluatorId) { + final ResourceRecoverEvent resourceRecoverEvent = ResourceEventImpl.newRecoveryBuilder().setIdentifier(evaluatorId).build(); @@ -61,31 +71,26 @@ public final class EvaluatorRestartInfo { * recovered evaluator on restart. */ public ResourceRecoverEvent getResourceRecoverEvent() { - return resourceRecoverEvent; + return this.resourceRecoverEvent; } /** * @return the current process of the restart. */ public EvaluatorRestartState getEvaluatorRestartState() { - return evaluatorRestartState; + return this.evaluatorRestartState; } /** * sets the current process of the restart. */ public boolean setEvaluatorRestartState(final EvaluatorRestartState to) { - if (EvaluatorRestartState.isLegalTransition(evaluatorRestartState, to)) { + + if (this.evaluatorRestartState.isLegalTransition(to)) { this.evaluatorRestartState = to; return true; } return false; } - - private EvaluatorRestartInfo(final ResourceRecoverEvent resourceRecoverEvent, - final EvaluatorRestartState evaluatorRestartState) { - this.resourceRecoverEvent = resourceRecoverEvent; - this.evaluatorRestartState = evaluatorRestartState; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java index a9b2d94..c48c494 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java @@ -29,6 +29,7 @@ import org.apache.reef.annotations.audience.Private; @DriverSide @Unstable public enum EvaluatorRestartState { + /** * The evaluator is not a restarted instance. Not expecting. */ @@ -65,32 +66,51 @@ public enum EvaluatorRestartState { FAILED; /** + * Check if the transition of {@link EvaluatorRestartState} from one state to another is legal. + * @param fromState start state. + * @param toState destination state. * @return true if the transition of {@link EvaluatorRestartState} is legal. + * @deprecated TODO[JIRA REEF-1560] Use non-static method instead. Remove after version 0.16 */ - public static boolean isLegalTransition(final EvaluatorRestartState from, final EvaluatorRestartState to) { - switch(from) { + @Deprecated + public static boolean isLegalTransition( + final EvaluatorRestartState fromState, final EvaluatorRestartState toState) { + return fromState.isLegalTransition(toState); + } + + /** + * Check if the transition of {@link EvaluatorRestartState} from current state to the given one is legal. + * @param toState destination state. + * @return true if the transition is legal, false otherwise. + */ + public final boolean isLegalTransition(final EvaluatorRestartState toState) { + + switch(this) { case EXPECTED: - switch(to) { + switch(toState) { case EXPIRED: case REPORTED: return true; default: return false; } + case REPORTED: - switch(to) { + switch(toState) { case REREGISTERED: return true; default: return false; } + case REREGISTERED: - switch(to) { + switch(toState) { case PROCESSED: return true; default: return false; } + default: return false; } @@ -135,4 +155,11 @@ public enum EvaluatorRestartState { return false; } } + + /** + * @return true if the evaluator has had its recovery heartbeat processed. + */ + public boolean isReregistered() { + return this == REREGISTERED; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java index 1999329..922137f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java @@ -22,9 +22,61 @@ package org.apache.reef.runtime.common.driver; * The status of the Driver. */ public enum DriverStatus { + PRE_INIT, INIT, RUNNING, SHUTTING_DOWN, - FAILING + FAILING; + + /** + * Check if the driver is in process of shutting down (either gracefully or due to an error). + * @return true if the driver is shutting down (gracefully or otherwise). + */ + public boolean isClosing() { + return this == SHUTTING_DOWN || this == FAILING; + } + + /** + * Check whether a driver state transition from current state to a given one is legal. + * @param toStatus Destination state. + * @return true if transition is valid, false otherwise. + */ + public boolean isLegalTransition(final DriverStatus toStatus) { + + switch (this) { + + case PRE_INIT: + switch (toStatus) { + case INIT: + return true; + default: + return false; + } + + case INIT: + switch (toStatus) { + case RUNNING: + return true; + default: + return false; + } + + case RUNNING: + switch (toStatus) { + case SHUTTING_DOWN: + case FAILING: + return true; + default: + return false; + } + + case FAILING: + case SHUTTING_DOWN: + return false; + + default: + throw new IllegalStateException("Unknown input state: " + this); + } + } } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java index 129e702..6084431 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java @@ -76,44 +76,6 @@ public final class DriverStatusManager { } /** - * Check whether a state transition 'from->to' is legal. - * @param from Source state. - * @param to Destination state. - * @return true if transition is valid, false otherwise. - */ - private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) { - switch (from) { - case PRE_INIT: - switch (to) { - case INIT: - return true; - default: - return false; - } - case INIT: - switch (to) { - case RUNNING: - return true; - default: - return false; - } - case RUNNING: - switch (to) { - case SHUTTING_DOWN: - case FAILING: - return true; - default: - return false; - } - case FAILING: - case SHUTTING_DOWN: - return false; - default: - throw new IllegalStateException("Unknown input state: " + from); - } - } - - /** * Changes the driver status to INIT and sends message to the client about the transition. */ public synchronized void onInit() { @@ -134,7 +96,7 @@ public final class DriverStatusManager { LOG.entering(CLASS_NAME, "onRunning"); - if (this.driverStatus.equals(DriverStatus.PRE_INIT)) { + if (this.driverStatus == DriverStatus.PRE_INIT) { this.onInit(); } @@ -152,7 +114,7 @@ public final class DriverStatusManager { LOG.entering(CLASS_NAME, "onError", exception); - if (this.isShuttingDownOrFailing()) { + if (this.isClosing()) { LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception); } else { LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception); @@ -171,15 +133,18 @@ public final class DriverStatusManager { LOG.entering(CLASS_NAME, "onComplete"); - if (this.isShuttingDownOrFailing()) { + if (this.isClosing()) { LOG.log(Level.WARNING, "Ignoring second call to onComplete()", new Exception("Dummy exception to get the call stack")); } else { + LOG.log(Level.INFO, "Clean shutdown of the Driver."); + if (LOG.isLoggable(Level.FINEST)) { LOG.log(Level.FINEST, "Call stack: ", new Exception("Dummy exception to get the call stack")); } + this.clock.close(); this.setStatus(DriverStatus.SHUTTING_DOWN); } @@ -201,9 +166,10 @@ public final class DriverStatusManager { * @deprecated TODO[JIRA REEF-1548] Do not use DriverStatusManager as a proxy to the job client. * After release 0.16, make this method private and use it inside onRuntimeStop() method instead. */ + @Deprecated public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) { - if (!this.isShuttingDownOrFailing()) { + if (!this.isClosing()) { LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. " + "This is likely a illegal call to clock.close() at play. Current state: {0}", this.driverStatus); } @@ -234,21 +200,34 @@ public final class DriverStatusManager { this.driverTerminationHasBeenCommunicatedToClient = true; } + /** + * Check if the driver is in process of shutting down (either gracefully or due to an error). + * @return true if the driver is shutting down (gracefully or otherwise). + * @deprecated TODO[JIRA REEF-1560] Use isClosing() method instead. Remove after version 0.16 + */ + @Deprecated public synchronized boolean isShuttingDownOrFailing() { - return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus) - || DriverStatus.FAILING.equals(this.driverStatus); + return this.isClosing(); + } + + /** + * Check if the driver is in process of shutting down (either gracefully or due to an error). + * @return true if the driver is shutting down (gracefully or otherwise). + */ + public synchronized boolean isClosing() { + return this.driverStatus.isClosing(); } /** * Helper method to set the status. * This also checks whether the transition from the current status to the new one is legal. - * @param newStatus Driver status to transition to. + * @param toStatus Driver status to transition to. */ - private synchronized void setStatus(final DriverStatus newStatus) { - if (isLegalTransition(this.driverStatus, newStatus)) { - this.driverStatus = newStatus; + private synchronized void setStatus(final DriverStatus toStatus) { + if (this.driverStatus.isLegalTransition(toStatus)) { + this.driverStatus = toStatus; } else { - LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new Object[] {this.driverStatus, newStatus}); + LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new Object[] {this.driverStatus, toStatus}); } } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index d4b8997..fc77380 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -25,6 +25,7 @@ import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders; import org.apache.reef.driver.restart.DriverRestartManager; import org.apache.reef.driver.restart.EvaluatorRestartState; import org.apache.reef.exception.NonSerializableException; +import org.apache.reef.runtime.common.driver.api.*; import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO; import org.apache.reef.runtime.common.driver.evaluator.pojos.State; @@ -41,10 +42,6 @@ import org.apache.reef.io.naming.Identifiable; import org.apache.reef.proto.EvaluatorRuntimeProtocol; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.driver.evaluator.EvaluatorProcess; -import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; -import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl; -import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; -import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; import org.apache.reef.runtime.common.driver.context.ContextControlHandler; import org.apache.reef.runtime.common.driver.context.ContextRepresenters; import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; @@ -111,17 +108,19 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { // Mutable fields private Optional<TaskRepresenter> task = Optional.empty(); - private boolean isResourceReleased = false; - private boolean allocationFired = false; + private boolean resourceNotReleased = true; + private boolean allocationNotFired = true; @Inject private EvaluatorManager( + @Parameter(EvaluatorIdentifier.class) final String evaluatorId, + @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor, + @Parameter(EvaluatorConfigurationProviders.class) + final Set<ConfigurationProvider> evaluatorConfigurationProviders, final Clock clock, final RemoteManager remoteManager, final ResourceReleaseHandler resourceReleaseHandler, final ResourceLaunchHandler resourceLaunchHandler, - @Parameter(EvaluatorIdentifier.class) final String evaluatorId, - @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor, final ContextRepresenters contextRepresenters, final ConfigurationSerializer configurationSerializer, final EvaluatorMessageDispatcher messageDispatcher, @@ -131,18 +130,20 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { final ExceptionCodec exceptionCodec, final EventHandlerIdlenessSource idlenessSource, final LoggingScopeFactory loggingScopeFactory, - @Parameter(EvaluatorConfigurationProviders.class) - final Set<ConfigurationProvider> evaluatorConfigurationProviders, final DriverRestartManager driverRestartManager, final EvaluatorIdlenessThreadPool idlenessThreadPool) { - this.contextRepresenters = contextRepresenters; - this.idlenessSource = idlenessSource; + LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId); + + this.evaluatorId = evaluatorId; + this.evaluatorDescriptor = evaluatorDescriptor; + this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; + this.clock = clock; + this.contextRepresenters = contextRepresenters; + this.idlenessSource = idlenessSource; this.resourceReleaseHandler = resourceReleaseHandler; this.resourceLaunchHandler = resourceLaunchHandler; - this.evaluatorId = evaluatorId; - this.evaluatorDescriptor = evaluatorDescriptor; this.messageDispatcher = messageDispatcher; this.evaluatorControlHandler = evaluatorControlHandler; @@ -153,7 +154,6 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { this.remoteManager = remoteManager; this.configurationSerializer = configurationSerializer; this.loggingScopeFactory = loggingScopeFactory; - this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; this.driverRestartManager = driverRestartManager; this.idlenessThreadPool = idlenessThreadPool; @@ -182,28 +182,27 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once. */ public synchronized void fireEvaluatorAllocatedEvent() { - if (!allocationFired && stateManager.isAllocated()) { + + if (this.stateManager.isAllocated() && this.allocationNotFired) { + final AllocatedEvaluator allocatedEvaluator = new AllocatedEvaluatorImpl(this, - remoteManager.getMyIdentifier(), - configurationSerializer, + this.remoteManager.getMyIdentifier(), + this.configurationSerializer, getJobIdentifier(), - loggingScopeFactory, - evaluatorConfigurationProviders); - LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId); - messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); - allocationFired = true; + this.loggingScopeFactory, + this.evaluatorConfigurationProviders); + + LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", this.evaluatorId); + + this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); + this.allocationNotFired = false; + } else { - LOG.log(Level.WARNING, "Evaluator allocated event fired more than once."); + LOG.log(Level.WARNING, "AllocatedEvaluator event fired more than once."); } } - private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) { - return resourceStatusEvent.getState() == State.DONE || - resourceStatusEvent.getState() == State.FAILED || - resourceStatusEvent.getState() == State.KILLED; - } - @Override public String getId() { return this.evaluatorId; @@ -219,58 +218,63 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { @Override public void close() { + synchronized (this.evaluatorDescriptor) { - if (this.stateManager.isAllocatedOrSubmittedOrRunning()) { + + if (this.stateManager.isAvailable()) { + LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId()); + try { - if (this.stateManager.isRunning()){ + + if (this.stateManager.isRunning()) { + // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. - final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto = + this.sendEvaluatorControlMessage( EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() .setTimestamp(System.currentTimeMillis()) .setIdentifier(getId()) .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()) - .build(); - sendEvaluatorControlMessage(evaluatorControlProto); + .build()); + this.stateManager.setClosing(); + } else { this.stateManager.setKilled(); } + } catch (Exception e) { LOG.log(Level.WARNING, "Exception occurred when manager sends killing message to task.", e); this.stateManager.setKilled(); } } - if (!this.isResourceReleased) { - this.isResourceReleased = true; + if (this.resourceNotReleased) { + + this.resourceNotReleased = false; + + final ResourceReleaseEvent releaseEvent = ResourceReleaseEventImpl.newBuilder() + .setIdentifier(this.evaluatorId) + .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName()) + .build(); + try { - /* We need to wait awhile before returning the container to the RM in order to - * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */ + // We need to wait awhile before returning the container to the RM + // in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit. this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { @Override public void onNext(final Alarm alarm) { - EvaluatorManager.this.resourceReleaseHandler.onNext( - ResourceReleaseEventImpl.newBuilder() - .setIdentifier(EvaluatorManager.this.evaluatorId) - .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName()) - .build() - ); + resourceReleaseHandler.onNext(releaseEvent); } }); } catch (final IllegalStateException e) { LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e); - EvaluatorManager.this.resourceReleaseHandler.onNext( - ResourceReleaseEventImpl.newBuilder() - .setIdentifier(EvaluatorManager.this.evaluatorId) - .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName()) - .build() - ); + this.resourceReleaseHandler.onNext(releaseEvent); } } } - idlenessThreadPool.runCheckAsync(this); + this.idlenessThreadPool.runCheckAsync(this); } /** @@ -278,8 +282,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { * <em>and</em> there are no messages queued or in processing. */ public boolean isClosed() { - return this.messageDispatcher.isEmpty() && - this.stateManager.isDoneOrFailedOrKilled(); + return this.messageDispatcher.isEmpty() && this.stateManager.isCompleted(); } /** @@ -310,9 +313,11 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { */ public void onEvaluatorException(final EvaluatorException exception) { synchronized (this.evaluatorDescriptor) { - if (this.stateManager.isDoneOrFailedOrKilled()) { - LOG.log(Level.FINE, "Ignoring an exception received for Evaluator {0} which is already in state {1}.", - new Object[]{this.getId(), this.stateManager}); + + if (this.stateManager.isCompleted()) { + LOG.log(Level.FINE, + "Ignoring an exception received for Evaluator {0} which is already in state {1}.", + new Object[] {this.getId(), this.stateManager}); return; } @@ -324,6 +329,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { final Optional<FailedTask> failedTaskOptional; if (this.task.isPresent()) { + final String taskId = this.task.get().getId(); final Optional<ActiveContext> evaluatorContext = Optional.empty(); final Optional<byte[]> bytes = Optional.empty(); @@ -332,13 +338,15 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { final Optional<String> description = Optional.empty(); final FailedTask failedTask = new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext); + failedTaskOptional = Optional.of(failedTask); + } else { failedTaskOptional = Optional.empty(); } - final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(exception, failedContextList, - failedTaskOptional, this.evaluatorId); + final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl( + exception, failedContextList, failedTaskOptional, this.evaluatorId); if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) { this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator); @@ -362,18 +370,26 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto = evaluatorHeartbeatProtoRemoteMessage.getMessage(); + LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto); synchronized (this.evaluatorDescriptor) { - if (this.stateManager.isDoneOrFailedOrKilled()) { - LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.", - new Object[]{this.getId(), this.stateManager}); + + if (this.stateManager.isCompleted()) { + + LOG.log(Level.FINE, + "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.", + new Object[] {this.getId(), this.stateManager}); + return; - } else if (this.stateManager.isAllocatedOrSubmittedOrRunning()) { - this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp()); + + } else if (this.stateManager.isAvailable()) { + + this.sanityChecker.check(this.evaluatorId, evaluatorHeartbeatProto.getTimestamp()); final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); - final EvaluatorRestartState evaluatorRestartState = driverRestartManager.getEvaluatorRestartState(evaluatorId); + final EvaluatorRestartState evaluatorRestartState = + this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId); /* * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator @@ -396,7 +412,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); if (evaluatorRestartState == EvaluatorRestartState.REPORTED) { - driverRestartManager.setEvaluatorReregistered(evaluatorId); + this.driverRestartManager.setEvaluatorReregistered(this.evaluatorId); } } } @@ -406,8 +422,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { // Process the Evaluator status message if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { - EvaluatorStatusPOJO evaluatorStatus = new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()); - this.onEvaluatorStatusMessage(evaluatorStatus); + this.onEvaluatorStatusMessage(new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus())); } // Process the Context status message(s) @@ -417,14 +432,13 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber)); } - this.contextRepresenters.onContextStatusMessages(contextStatusList, - informClientOfNewContexts); + this.contextRepresenters.onContextStatusMessages(contextStatusList, informClientOfNewContexts); // Process the Task status message if (evaluatorHeartbeatProto.hasTaskStatus()) { - TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber); - this.onTaskStatusMessage(taskStatus); + this.onTaskStatusMessage(new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber)); } + LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); } } @@ -461,7 +475,9 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { * @param message */ private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) { + assert message.getState() == State.DONE; + LOG.log(Level.FINEST, "Evaluator {0} done.", getId()); // Send an ACK to the Evaluator. @@ -474,7 +490,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { this.stateManager.setDone(); this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId)); - close(); + + this.close(); } /** @@ -483,22 +500,24 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { * @param evaluatorStatus */ private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) { - assert evaluatorStatus.getState() - == State.FAILED; + + assert evaluatorStatus.getState() == State.FAILED; + final EvaluatorException evaluatorException; + if (evaluatorStatus.hasError()) { + final Optional<Throwable> exception = this.exceptionCodec.fromBytes(evaluatorStatus.getError()); - if (exception.isPresent()) { - evaluatorException = new EvaluatorException(getId(), exception.get()); - } else { - evaluatorException = new EvaluatorException(getId(), - new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError())); - } + + evaluatorException = new EvaluatorException(getId(), exception.isPresent() ? exception.get() : + new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError())); + } else { evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent")); } - onEvaluatorException(evaluatorException); + + this.onEvaluatorException(evaluatorException); } /** @@ -507,8 +526,10 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { * @param message */ private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO message) { + assert message.getState() == State.KILLED; - assert stateManager.isClosing(); + assert this.stateManager.isClosing(); + LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId()); this.stateManager.setKilled(); @@ -519,9 +540,9 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { if (this.stateManager.isAllocated()) { this.stateManager.setSubmitted(); this.resourceLaunchHandler.onNext(resourceLaunchEvent); - } else if (this.stateManager.isFailedOrKilled()) { - LOG.log(Level.WARNING, "Evaluator manager expected" + EvaluatorState.ALLOCATED + - " state but instead is in state " + this.stateManager); + } else if (this.stateManager.isCompletedAbnormally()) { + LOG.log(Level.WARNING, "Evaluator manager expected {0} state but instead is in state {1}", + new Object[] {EvaluatorState.ALLOCATED, this.stateManager}); } else { throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED + " state but instead is in state " + this.stateManager); @@ -560,18 +581,17 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) { if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) { - if (taskStatus.getState() == State.INIT || - taskStatus.getState() == State.FAILED || - taskStatus.getState() == State.RUNNING || - driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.REREGISTERED) { + + final State state = taskStatus.getState(); + if (state.isRestartable() || + this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId).isReregistered()) { // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order // [REEF-289] is a related item which may fix the issue - if (taskStatus.getState() == State.RUNNING) { + if (state.isRunning()) { LOG.log(Level.WARNING, - "Received a message of state " + ReefServiceProtos.State.RUNNING + - " for Task " + taskStatus.getTaskId() + - " before receiving its " + ReefServiceProtos.State.INIT + " state"); + "Received a message of state {0} for Task {1} before receiving its {2} state", + new Object[] {State.RUNNING, taskStatus.getTaskId(), State.INIT}); } // FAILED is a legal first state of a Task as it could have failed during construction. @@ -583,11 +603,12 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { this.exceptionCodec, this.driverRestartManager)); } else { - throw new RuntimeException("Received a message of state " + taskStatus.getState() + + throw new RuntimeException("Received a message of state " + state + ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() + " which we haven't heard from before."); } } + this.task.get().onTaskStatusMessage(taskStatus); if (this.task.get().isNotRunning()) { @@ -600,20 +621,28 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { * Resource status information from the (actual) resource manager. */ public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) { + synchronized (this.evaluatorDescriptor) { - LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusEvent.getState()); - if (!this.stateManager.isAllocatedOrSubmittedOrRunning()) { - LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.", - new Object[]{this.getId(), this.stateManager}); - } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && this.stateManager.isAllocatedOrSubmittedOrRunning()) { - // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes - // it to be alive. + + final State state = resourceStatusEvent.getState(); + LOG.log(Level.FINEST, "Resource manager state update: {0}", state); + + if (!this.stateManager.isAvailable()) { + + LOG.log(Level.FINE, + "Ignoring resource status update for Evaluator {0} which is already in state {1}.", + new Object[] {this.getId(), this.stateManager}); + + } else if (state.isCompleted() && this.stateManager.isAvailable()) { + + // Something is wrong. The resource manager reports that the Evaluator is done or failed, + // but the Driver assumes it to be alive. final StringBuilder messageBuilder = new StringBuilder("Evaluator [") .append(this.evaluatorId) .append("] is assumed to be in state [") .append(this.stateManager.toString()) .append("]. But the resource manager reports it to be in state [") - .append(resourceStatusEvent.getState()) + .append(state) .append("]."); if (this.stateManager.isSubmitted()) { @@ -626,6 +655,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " + "back to the driver."); } + if (this.task.isPresent()) { messageBuilder.append(" Task [") .append(this.task.get().getId()) @@ -633,8 +663,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } if (resourceStatusEvent.getState() == State.KILLED) { - this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, - messageBuilder.toString())); + this.onEvaluatorException( + new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString())); } else { this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString())); } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java index 653486e..6ee80a8 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java @@ -22,17 +22,155 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; /** - * Various states that the EvaluatorManager could be in. The EvaluatorManager is - * created when a resource has been allocated by the ResourceManager. + * Various states that the EvaluatorManager could be in. + * The EvaluatorManager is created when a resource has been allocated by the ResourceManager. */ @DriverSide @Private enum EvaluatorState { - ALLOCATED, // initial state - SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact - RUNNING, // first contact received, all communication channels established, Evaluator sent to client. - CLOSING, // evaluator is asked shutdown, but not closed yet. - DONE, // clean shutdown - FAILED, // some failure occurred. - KILLED // unclean shutdown + + /** Initial state. */ + ALLOCATED, + + /** Client called AllocatedEvaluator.submitTask() and we're waiting for first contact. */ + SUBMITTED, + + /** First contact received, all communication channels established, Evaluator sent to client. */ + RUNNING, + + /** Evaluator is asked to shut down, but has not closed yet. */ + CLOSING, + + /** Clean shutdown. */ + DONE, + + /** Some failure occurred. */ + FAILED, + + /** Unclean shutdown. */ + KILLED; + + /** + * Check if evaluator is in the initial state (ALLOCATED). + * @return true if ALLOCATED, false otherwise. + */ + public final boolean isAllocated() { + return this == ALLOCATED; + } + + /** + * Check if evaluator is in SUBMITTED state. + * @return true if SUBMITTED, false otherwise. + */ + public final boolean isSubmitted() { + return this == SUBMITTED; + } + + /** + * Check if the evaluator is in running state. + * @return true if RUNNING, false otherwise. + */ + public final boolean isRunning() { + return this == RUNNING; + } + + /** + * Check if the evaluator is in the process of being shut down. + * @return true if evaluator is being closed, false otherwise. + */ + public final boolean isClosing() { + return this == CLOSING; + } + + /** + * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, or RUNNING). + * @return true if evaluator is available, false if it is closed or in the process of being shut down. + */ + public final boolean isAvailable() { + return this == ALLOCATED || this == SUBMITTED || this == RUNNING; + } + + /** + * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, or KILLED states. + * @return true if evaluator completed, false if it is still available or in the process of being shut down. + */ + public final boolean isCompleted() { + return this == DONE || this == FAILED || this == KILLED; + } + + /** + * Check if the evaluator is closed due to an error. That is, in FAILED or KILLED state. + * @return true if evaluator is stopped due to an error, true otherwise. + */ + public final boolean isCompletedAbnormally() { + return this == FAILED || this == KILLED; + } + + /** + * Check if transition from current state to the given one is legal. + * @param toState new state to transition to. + * @return true if transition is legal, false otherwise. + */ + public final boolean isLegalTransition(final EvaluatorState toState) { + + if (this == toState) { + return true; + } + + switch(this) { + + case ALLOCATED: + switch(toState) { + case SUBMITTED: + case CLOSING: + case DONE: + case FAILED: + case KILLED: + return true; + default: + return false; + } + + case SUBMITTED: + switch(toState) { + case RUNNING: + case CLOSING: + case DONE: + case FAILED: + case KILLED: + return true; + default: + return false; + } + + case RUNNING: + switch(toState) { + case CLOSING: + case DONE: + case FAILED: + case KILLED: + return true; + default: + return false; + } + + case CLOSING: + switch(toState) { + case DONE: + case FAILED: + case KILLED: + return true; + default: + return false; + } + + case DONE: + case FAILED: + case KILLED: + return false; + + default: + throw new RuntimeException("Unknown state: " + this); + } + } } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java index a2b249a..bb78dbe 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java @@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import javax.inject.Inject; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,160 +32,159 @@ import java.util.logging.Logger; @DriverSide @Private final class EvaluatorStatusManager { + private static final Logger LOG = Logger.getLogger(EvaluatorStatusManager.class.getName()); + /** * The state managed. */ - private EvaluatorState state = EvaluatorState.ALLOCATED; + private final AtomicReference<EvaluatorState> state = new AtomicReference<>(EvaluatorState.ALLOCATED); @Inject private EvaluatorStatusManager() { LOG.log(Level.FINE, "Instantiated 'EvaluatorStatusManager'"); } - private static boolean isLegal(final EvaluatorState from, final EvaluatorState to) { - if (from == to) { - return true; - } - - switch(from) { - case ALLOCATED: { - switch(to) { - case SUBMITTED: - case DONE: - case CLOSING: - case FAILED: - return true; - case KILLED: - case RUNNING: - break; - default: - throw new RuntimeException("Unknown state: " + to); - } - } - case SUBMITTED: { - switch(to) { - case RUNNING: - case DONE: - case CLOSING: - case FAILED: - return true; - case ALLOCATED: - case KILLED: - break; - default: - throw new RuntimeException("Unknown state: " + to); - } - } - case RUNNING: { - switch(to) { - case DONE: - case CLOSING: - case FAILED: - return true; - case ALLOCATED: - case SUBMITTED: - case KILLED: - break; - default: - throw new RuntimeException("Unknown state: " + to); - } - } - case CLOSING: { - switch(to) { - case KILLED: - case DONE: - case FAILED: - return true; - case ALLOCATED: - case SUBMITTED: - case RUNNING: - break; - default: - throw new RuntimeException("Unknown state: " + to); - } - } - case DONE: - case FAILED: - case KILLED: - break; - default: - throw new RuntimeException("Unknown state: " + from); - } - - LOG.warning("Illegal evaluator state transition from " + from + " to " + to + "."); - return false; - } - - private static boolean isDoneOrFailedOrKilled(final EvaluatorState state) { - return state == EvaluatorState.DONE || - state == EvaluatorState.FAILED || - state == EvaluatorState.KILLED; - } - - synchronized void setRunning() { + void setRunning() { this.setState(EvaluatorState.RUNNING); } - synchronized void setSubmitted() { + void setSubmitted() { this.setState(EvaluatorState.SUBMITTED); } - synchronized void setClosing() { + void setClosing() { this.setState(EvaluatorState.CLOSING); } - synchronized void setDone() { + void setDone() { this.setState(EvaluatorState.DONE); } - synchronized void setFailed() { + void setFailed() { this.setState(EvaluatorState.FAILED); } - synchronized void setKilled() { + void setKilled() { this.setState(EvaluatorState.KILLED); } - synchronized boolean isRunning() { - return this.state.equals(EvaluatorState.RUNNING); + /** + * Check if evaluator is in the initial state (ALLOCATED). + * @return true if allocated, false otherwise. + */ + boolean isAllocated() { + return this.state.get().isAllocated(); + } + + /** + * Check if evaluator is in SUBMITTED state. + * @return true if submitted, false otherwise. + */ + boolean isSubmitted() { + return this.state.get().isSubmitted(); + } + + /** + * Check if the evaluator is in running state. + * @return true if RUNNING, false otherwise. + */ + boolean isRunning() { + return this.state.get().isRunning(); } - synchronized boolean isDoneOrFailedOrKilled() { - return isDoneOrFailedOrKilled(this.state); + /** + * Check if the evaluator is in the process of being shut down. + * @return true if evaluator is being closed, false otherwise. + */ + boolean isClosing() { + return this.state.get().isClosing(); } - synchronized boolean isAllocatedOrSubmittedOrRunning() { - return this.state == EvaluatorState.ALLOCATED || - this.state == EvaluatorState.SUBMITTED || - this.state == EvaluatorState.RUNNING; + /** + * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, or RUNNING). + * @return true if evaluator is available, false if it is closed or in the process of being shut down. + * @deprecated TODO[JIRA REEF-1560] Use isAvailable() method instead. Remove after version 0.16 + */ + @Deprecated + boolean isAllocatedOrSubmittedOrRunning() { + return this.state.get().isAvailable(); } - synchronized boolean isSubmitted() { - return EvaluatorState.SUBMITTED == this.state; + /** + * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, or RUNNING). + * @return true if evaluator is available, false if it is closed or in the process of being shut down. + */ + boolean isAvailable() { + return this.state.get().isAvailable(); } - synchronized boolean isAllocated() { - return EvaluatorState.ALLOCATED == this.state; + /** + * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, or KILLED states. + * @return true if evaluator completed, false if it is still available or in the process of being shut down. + * @deprecated TODO[JIRA REEF-1560] Use isCompleted() method instead. Remove after version 0.16 + */ + @Deprecated + boolean isDoneOrFailedOrKilled() { + return this.state.get().isCompleted(); } - synchronized boolean isFailedOrKilled() { - return EvaluatorState.FAILED == this.state || EvaluatorState.KILLED == this.state; + /** + * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, or KILLED states. + * @return true if evaluator completed, false if it is still available or in the process of being shut down. + */ + boolean isCompleted() { + return this.state.get().isCompleted(); + } + + /** + * Check if the evaluator is closed due to an error. That is, in FAILED or KILLED state. + * @return true if evaluator is stopped due to an error, true otherwise. + * @deprecated TODO[JIRA REEF-1560] Use isCompletedAbnormally() method instead. Remove after version 0.16 + */ + @Deprecated + boolean isFailedOrKilled() { + return this.state.get().isCompletedAbnormally(); } - synchronized boolean isClosing() { - return EvaluatorState.CLOSING == this.state; + /** + * Check if the evaluator is closed due to an error. That is, in FAILED or KILLED state. + * @return true if evaluator is stopped due to an error, true otherwise. + */ + boolean isCompletedAbnormally() { + return this.state.get().isCompletedAbnormally(); } + /** + * Return string representation of the current state of hte Evaluator, like RUNNING or DONE. + * @return string representation of the current state of the Evaluator. + */ @Override - public synchronized String toString() { - return this.state.toString(); + public String toString() { + return this.state.get().toString(); } - private synchronized void setState(final EvaluatorState state) { - if (!isLegal(this.state, state)) { - throw new IllegalStateException("Illegal state transition from '" + this.state + "' to '" + state + "'"); + /** + * Transition to the new state of the evaluator, if possible. + * @param toState New state of the evaluator. + * @throws IllegalStateException if state transition is not valid. + */ + private void setState(final EvaluatorState toState) { + while (true) { + + final EvaluatorState fromState = this.state.get(); + if (fromState == toState) { + break; + } + + if (!fromState.isLegalTransition(toState)) { + LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new Object[] {fromState, toState}); + throw new IllegalStateException("Illegal state transition: " + fromState + " -> " + toState); + } + + if (this.state.compareAndSet(fromState, toState)) { + break; + } } - this.state = state; } } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java index 661369c..0eff9bd 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java @@ -35,55 +35,31 @@ public final class EvaluatorStatusPOJO { private final State evaluatorState; private final byte[] errorBytes; - public EvaluatorStatusPOJO(final ReefServiceProtos.EvaluatorStatusProto proto) { - - evaluatorID = proto.getEvaluatorId(); - evaluatorIdBytes = proto.getEvaluatorIdBytes().toByteArray(); - evaluatorState = proto.hasState()? getStateFromProto(proto.getState()) : null; - errorBytes = proto.hasError() ? proto.getError().toByteArray() : null; - + this.evaluatorID = proto.getEvaluatorId(); + this.evaluatorIdBytes = proto.getEvaluatorIdBytes().toByteArray(); + this.evaluatorState = proto.hasState() ? State.fromProto(proto.getState()) : null; + this.errorBytes = proto.hasError() ? proto.getError().toByteArray() : null; } /** - * @return true, if an evaluator has thrown an exception and sent it to a driver + * @return true, if an evaluator has thrown an exception and sent it to a driver. */ public boolean hasError() { - return null != errorBytes; + return null != this.errorBytes; } /** - * @return serialized exception thrown by an evaluator + * @return serialized exception thrown by an evaluator. */ - public byte[] getError(){ - return errorBytes; + public byte[] getError() { + return this.errorBytes; } /** - * @return current {@link org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task + * @return current state of a task. */ - public State getState(){ - return evaluatorState; + public State getState() { + return this.evaluatorState; } - - private State getStateFromProto(final org.apache.reef.proto.ReefServiceProtos.State protoState) { - - switch (protoState) { - case INIT: - return State.INIT; - case RUNNING: - return State.RUNNING; - case DONE: - return State.DONE; - case SUSPEND: - return State.SUSPEND; - case FAILED: - return State.FAILED; - case KILLED: - return State.KILLED; - default: - throw new IllegalStateException("Unknown state " + protoState + " in EvaluatorStatusProto"); - } - } - } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java index 477564c..6ea7cb1 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java @@ -21,20 +21,127 @@ package org.apache.reef.runtime.common.driver.evaluator.pojos; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.proto.ReefServiceProtos; /** * DriverSide representation of ReefServiceProtos.State. */ - @DriverSide @Private public enum State { - INIT, - RUNNING, - DONE, - SUSPEND, - FAILED, - KILLED; + INIT, + RUNNING, + SUSPEND, + DONE, + FAILED, + KILLED; + + /** + * Get a driver-side state given the proto. It is a 1:1 mapping. + * @param protoState remote state from the proto. + * @return a corresponding (identical) driver-side state (always a 1:1 mapping). + */ + public static State fromProto(final ReefServiceProtos.State protoState) { + switch (protoState) { + case INIT: + return INIT; + case RUNNING: + return RUNNING; + case SUSPEND: + return SUSPEND; + case DONE: + return DONE; + case FAILED: + return FAILED; + case KILLED: + return KILLED; + default: + throw new IllegalStateException("Unknown state " + protoState + " in EvaluatorStatusProto"); + } + } + + /** + * Checks if the ResourceManager can switch from the current state to the target state. + * See REEF-826 for the state transition matrix. + * @param toState state to switch to. + * @return true if the transition is legal; false otherwise. + */ + public final boolean isLegalTransition(final State toState) { + + if (this == toState) { + return true; + } + + switch (this) { + + case INIT: + switch (toState) { + case RUNNING: + case SUSPEND: + case DONE: + case FAILED: + case KILLED: + return true; + default: + return false; + } + + case RUNNING: + switch (toState) { + case SUSPEND: + case DONE: + case FAILED: + case KILLED: + return true; + default: + return false; + } + + case SUSPEND: + switch (toState) { + case RUNNING: + case FAILED: + case KILLED: + return true; + default: + return false; + } + + default: + return false; + } + } + + /** + * Check if container is in RUNNING state. + * @return true if container is running. + */ + public final boolean isRunning() { + return this == RUNNING; + } + + /** + * Check if container is available - that is, in one of the states INIT, RUNNING, or SUSPEND. + * @return true if container is available, false if it is closed or in the process of being shut down. + */ + public final boolean isAvailable() { + return this == INIT || this == RUNNING || this == SUSPEND; + } + + /** + * Check if the container is stopped. That is, in one of the DONE, FAILED, or KILLED states. + * @return true if the container is completed, false if it is still available or suspended. + */ + public final boolean isCompleted() { + return this == DONE || this == FAILED || this == KILLED; + } + /** + * Check if the container is can be restarted. That is, in one of the INIT, RUNNING, or FAILED states. + * @return true if the container can be restarted. + */ + public final boolean isRestartable() { + return this == INIT || this == RUNNING || this == FAILED; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java index f2e3f2d..3eb4e23 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java @@ -40,79 +40,57 @@ public final class TaskStatusPOJO { private final byte[] result; private final List<TaskMessagePOJO> taskMessages = new ArrayList<>(); - public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto, final long sequenceNumber){ + public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto, final long sequenceNumber) { - taskId = proto.getTaskId(); - contextId = proto.getContextId(); - state = proto.hasState()? getStateFromProto(proto.getState()) : null; - result = proto.hasResult() ? proto.getResult().toByteArray() : null; + this.taskId = proto.getTaskId(); + this.contextId = proto.getContextId(); + this.state = proto.hasState() ? State.fromProto(proto.getState()) : null; + this.result = proto.hasResult() ? proto.getResult().toByteArray() : null; for (final TaskMessageProto taskMessageProto : proto.getTaskMessageList()) { - taskMessages.add(new TaskMessagePOJO(taskMessageProto, sequenceNumber)); + this.taskMessages.add(new TaskMessagePOJO(taskMessageProto, sequenceNumber)); } - } /** - * @return a list of messages sent by a task + * @return a list of messages sent by a task. */ - public List<TaskMessagePOJO> getTaskMessageList(){ - return taskMessages; + public List<TaskMessagePOJO> getTaskMessageList() { + return this.taskMessages; } /** - * @return true, if a completed task returned a non-null value in the 'return' statement + * @return true, if a completed task returned a non-null value in the 'return' statement. */ - public boolean hasResult(){ - return null != result; + public boolean hasResult() { + return null != this.result; } /** - * @return serialized result that a completed task returned to the Driver + * @return serialized result that a completed task returned to the Driver. */ - public byte[] getResult(){ - return result; + public byte[] getResult() { + return this.result; } /** - * @return the id of a task + * @return the id of a task. */ - public String getTaskId(){ - return taskId; + public String getTaskId() { + return this.taskId; } /** - * @return the id of a context that this task runs within + * @return the id of a context that this task runs within. */ - public String getContextId(){ - return contextId; + public String getContextId() { + return this.contextId; } /** - * @return current {@link org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task + * @return current state of a task. */ - public State getState(){ - return state; - } - - private State getStateFromProto(final org.apache.reef.proto.ReefServiceProtos.State protoState) { - - switch (protoState) { - case INIT: - return State.INIT; - case RUNNING: - return State.RUNNING; - case DONE: - return State.DONE; - case SUSPEND: - return State.SUSPEND; - case FAILED: - return State.FAILED; - case KILLED: - return State.KILLED; - default: - throw new IllegalStateException("Unknown state " + protoState + " in EvaluatorStatusProto"); - } - + public State getState() { + return this.state; } } http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java index 285a1b1..2792790 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java @@ -56,7 +56,7 @@ public final class DriverIdleManager { final DriverStatusManager driverStatusManagerImpl = this.driverStatusManager.get(); - if (driverStatusManagerImpl.isShuttingDownOrFailing()) { + if (driverStatusManagerImpl.isClosing()) { LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason [{1}]", new Object[] {reason.getComponentName(), reason.getReason()}); return; http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java index f9a526d..4b19330 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java @@ -154,73 +154,15 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv } } - /** - * Checks if the ResourceManager can switch from the current state to the target state. - * See REEF-826 for the state transition matrix. - * @param from current state. - * @param to state to switch to. - * @return true if the transition is legal; false otherwise. - */ - private static boolean isLegalStateTransition(final State from, final State to) { - - // handle diagonal elements of the transition matrix - if (from.equals(to)) { - LOG.log(Level.FINEST, "Transition from {0} state to the same state.", from); - return true; - } - - // handle non-diagonal elements - switch (from) { - - case INIT: - switch (to) { - case RUNNING: - case SUSPEND: - case DONE: - case FAILED: - case KILLED: - return true; - default: - return false; - } - - case RUNNING: - switch (to) { - case SUSPEND: - case DONE: - case FAILED: - case KILLED: - return true; - default: - return false; - } - - case SUSPEND: - switch (to) { - case RUNNING: - case FAILED: - case KILLED: - return true; - default: - return false; - } - - case DONE: - case FAILED: - case KILLED: - return false; - - default: - return false; - } - } - - private synchronized void setState(final State newState) { - if (isLegalStateTransition(this.state, newState)) { - this.state = newState; + private synchronized void setState(final State toState) { + if (this.state == toState) { + LOG.log(Level.FINE, "Transition from {0} state to the same state.", this.state); + } else if (this.state.isLegalTransition(toState)) { + LOG.log(Level.FINEST, "State transition: {0} -> {1}", new State[] {this.state, toState}); + this.state = toState; } else { throw new IllegalStateException( - "Resource manager attempts illegal state transition from " + this.state + " to " + newState); + "Resource manager attempts illegal state transition from " + this.state + " to " + toState); } } }
