Repository: incubator-reef Updated Branches: refs/heads/master 9b16d54cd -> 8efffd910
[REEF-634] Capture Restarted Evaluators in a POJO This addressed the issue by * Creating ResourceRecoverEvent and changing EvaluatorRestartInfo to save the state of (potentially) restarted Evaluator. * Creating the interface ResourceEvent and have ResourceAllocationEvent and ResourceRecoverEvent extend it. * Have DriverRestartManager remember the status and other information of the evaluators to recover/fail on restart. JIRA: [REEF-634](https://issues.apache.org/jira/browse/REEF-634) Pull Request: This closes #408 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8efffd91 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8efffd91 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8efffd91 Branch: refs/heads/master Commit: 8efffd910e21f390a2ed08d4d9a84ce8ce9c2e51 Parents: 9b16d54 Author: Andrew Chung <afchun...@gmail.com> Authored: Fri Aug 21 18:25:24 2015 -0700 Committer: Markus Weimer <wei...@apache.org> Committed: Tue Aug 25 17:51:55 2015 -0700 ---------------------------------------------------------------------- .../DefaultDriverRuntimeRestartMangerImpl.java | 6 +- .../driver/restart/DriverRestartManager.java | 108 +++++++------- .../restart/DriverRuntimeRestartManager.java | 4 +- .../driver/restart/EvaluatorRestartInfo.java | 67 ++++++--- .../driver/restart/EvaluatorRestartState.java | 20 ++- .../reef/driver/restart/RestartEvaluators.java | 101 ++++++++++++++ .../ResourceAllocationEvent.java | 33 +---- .../ResourceAllocationEventImpl.java | 129 ----------------- .../driver/resourcemanager/ResourceEvent.java | 57 ++++++++ .../resourcemanager/ResourceEventImpl.java | 139 +++++++++++++++++++ .../resourcemanager/ResourceRecoverEvent.java | 30 ++++ .../runtime/local/driver/ResourceManager.java | 4 +- .../runtime/mesos/driver/REEFScheduler.java | 4 +- .../yarn/driver/YarnContainerManager.java | 4 +- .../driver/YarnDriverRuntimeRestartManager.java | 30 ++-- 15 files changed, 480 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java index 12934f7..277b872 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java @@ -57,9 +57,9 @@ final class DefaultDriverRuntimeRestartMangerImpl implements DriverRuntimeRestar } @Override - public EvaluatorRestartInfo getAliveAndFailedEvaluators() { + public RestartEvaluators getPreviousEvaluators() { throw new DriverFatalRuntimeException( - "Restart is not enabled. getAliveAndFailedEvaluators should not have been called."); + "Restart is not enabled. getPreviousEvaluators should not have been called."); } @Override @@ -67,4 +67,4 @@ final class DefaultDriverRuntimeRestartMangerImpl implements DriverRuntimeRestar throw new DriverFatalRuntimeException( "Restart is not enabled. informAboutEvaluatorFailures should not have been called."); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java index 3e1be1f..d6b823c 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java @@ -38,7 +38,8 @@ import java.util.logging.Logger; public final class DriverRestartManager { private static final Logger LOG = Logger.getLogger(DriverRestartManager.class.getName()); private final DriverRuntimeRestartManager driverRuntimeRestartManager; - private final Map<String, EvaluatorRestartState> previousEvaluators = new HashMap<>(); + + private RestartEvaluators restartEvaluators; private DriverRestartState state = DriverRestartState.NotRestarted; @Inject @@ -81,9 +82,17 @@ public final class DriverRestartManager { * as alive to the job driver. */ public synchronized void onRestart() { - final EvaluatorRestartInfo evaluatorRestartInfo = driverRuntimeRestartManager.getAliveAndFailedEvaluators(); - setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators()); - driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators()); + if (this.state == DriverRestartState.RestartBegan) { + restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators(); + this.state = DriverRestartState.RestartInProgress; + } else { + final String errMsg = "Should not be setting the set of expected alive evaluators more than once."; + LOG.log(Level.SEVERE, errMsg); + throw new DriverFatalRuntimeException(errMsg); + } + + driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators()); + // TODO[REEF-560]: Call onDriverRestartCompleted() (to do in REEF-617) on a Timer. } @@ -92,32 +101,11 @@ public final class DriverRestartManager { * if the {@link DriverRestartManager} does not believe that it's an evaluator to be recovered. */ public synchronized EvaluatorRestartState getEvaluatorRestartState(final String evaluatorId) { - if (this.state.hasNotRestarted() || - !this.previousEvaluators.containsKey(evaluatorId)) { + if (this.state.hasNotRestarted()) { return EvaluatorRestartState.NOT_EXPECTED; } - return this.previousEvaluators.get(evaluatorId); - } - - /** - * Set the Evaluators to expect still active from a previous execution of the Driver in a restart situation. - * To be called exactly once during a driver restart. - * - * @param previousEvaluatorIds the evaluator IDs of the evaluators that are expected to have survived driver restart. - */ - private synchronized void setPreviousEvaluatorIds(final Set<String> previousEvaluatorIds) { - if (this.state == DriverRestartState.RestartBegan) { - for (final String previousEvaluatorId : previousEvaluatorIds) { - setEvaluatorExpected(previousEvaluatorId); - } - - this.state = DriverRestartState.RestartInProgress; - } else { - final String errMsg = "Should not be setting the set of expected alive evaluators more than once."; - LOG.log(Level.SEVERE, errMsg); - throw new DriverFatalRuntimeException(errMsg); - } + return getStateOfPreviousEvaluator(evaluatorId); } /** @@ -126,14 +114,13 @@ public final class DriverRestartManager { * @return true if the driver restart is completed. */ public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String evaluatorId) { - if (!this.previousEvaluators.containsKey(evaluatorId) || - this.previousEvaluators.get(evaluatorId) == EvaluatorRestartState.NOT_EXPECTED) { + if (getStateOfPreviousEvaluator(evaluatorId) == EvaluatorRestartState.NOT_EXPECTED) { final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " not expected to be alive."; LOG.log(Level.SEVERE, errMsg); throw new DriverFatalRuntimeException(errMsg); } - if (this.previousEvaluators.get(evaluatorId) != EvaluatorRestartState.EXPECTED) { + if (getStateOfPreviousEvaluator(evaluatorId) != EvaluatorRestartState.EXPECTED) { LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " added to the set" + " of recovered evaluators more than once. Ignoring second add..."); } else { @@ -160,74 +147,79 @@ public final class DriverRestartManager { } /** - * Signals to the {@link DriverRestartManager} that an evaluator is to be expected to report back after restart. - */ - public synchronized void setEvaluatorExpected(final String evaluatorId) { - if (previousEvaluators.containsKey(evaluatorId)) { - LOG.log(Level.WARNING, "Evaluator " + evaluatorId + " is already added to the set of previous evaluators with " + - "state [" + previousEvaluators.get(evaluatorId) + "]. Ignoring..."); - return; - } - - previousEvaluators.put(evaluatorId, EvaluatorRestartState.EXPECTED); - } - - /** * Signals to the {@link DriverRestartManager} that an evaluator has reported back after restart. */ public synchronized void setEvaluatorReported(final String evaluatorId) { - setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REPORTED); + setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REPORTED); } /** * Signals to the {@link DriverRestartManager} that an evaluator has had its recovery heartbeat processed. */ public synchronized void setEvaluatorReregistered(final String evaluatorId) { - setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REREGISTERED); + setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REREGISTERED); } /** * Signals to the {@link DriverRestartManager} that an evaluator has had its running task processed. */ public synchronized void setEvaluatorRunningTask(final String evaluatorId) { - setPreviousEvaluatorState( - evaluatorId, EvaluatorRestartState.PROCESSED); + setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED); } /** * Signals to the {@link DriverRestartManager} that an expected evaluator has been expired. */ public synchronized void setEvaluatorExpired(final String evaluatorId) { - setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.EXPIRED); + setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.EXPIRED); } - private synchronized void setPreviousEvaluatorState(final String evaluatorId, - final EvaluatorRestartState to) { - if (!previousEvaluators.containsKey(evaluatorId) || - !EvaluatorRestartState.isLegalTransition(previousEvaluators.get(evaluatorId), to)) { - throw evaluatorTransitionFailed(evaluatorId, to); + private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final String evaluatorId) { + if (this.restartEvaluators.contains(evaluatorId)) { + return EvaluatorRestartState.NOT_EXPECTED; } - previousEvaluators.put(evaluatorId, to); + return this.restartEvaluators.get(evaluatorId).getEvaluatorRestartState(); + } + + private synchronized void setStateOfPreviousEvaluator(final String evaluatorId, + final EvaluatorRestartState to) { + if (!restartEvaluators.contains(evaluatorId) || + !restartEvaluators.get(evaluatorId).setEvaluatorRestartState(to)) { + throw evaluatorTransitionFailed(evaluatorId, to); + } } private synchronized DriverFatalRuntimeException evaluatorTransitionFailed(final String evaluatorId, final EvaluatorRestartState to) { - if (!previousEvaluators.containsKey(evaluatorId)) { + if (!restartEvaluators.contains(evaluatorId)) { return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is not expected."); } return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " wants to transition to state " + - "[" + to + "], but is in the illegal state [" + previousEvaluators.get(evaluatorId) + "]."); + "[" + to + "], but is in the illegal state [" + + restartEvaluators.get(evaluatorId).getEvaluatorRestartState() + "]."); } private synchronized boolean haveAllExpectedEvaluatorsReported() { - for (final EvaluatorRestartState evaluatorRestartState : this.previousEvaluators.values()) { - if (!evaluatorRestartState.hasReported()) { + for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) { + final EvaluatorRestartState restartState = getStateOfPreviousEvaluator(previousEvaluatorId); + if (restartState == EvaluatorRestartState.EXPECTED) { return false; } } return true; } + + private Set<String> getFailedEvaluators() { + final Set<String> failed = new HashSet<>(); + for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) { + if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.FAILED) { + failed.add(previousEvaluatorId); + } + } + + return failed; + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java index 4e38b4b..5e1acec 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java @@ -58,9 +58,9 @@ public interface DriverRuntimeRestartManager { /** * Gets the sets of alive and failed evaluators based on the runtime implementation. - * @return EvaluatorRestartInfo, which encapsulates the alive and failed set of evaluator IDs. + * @return A map which encapsulates the states of previous evaluators. */ - EvaluatorRestartInfo getAliveAndFailedEvaluators(); + RestartEvaluators getPreviousEvaluators(); /** * Informs the necessary components about failed evaluators. The implementation is runtime dependent. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java index 10beb78..e058c9a 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java @@ -21,36 +21,71 @@ package org.apache.reef.driver.restart; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; - -import java.util.Collections; -import java.util.Set; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent; /** - * The encapsulating class for alive and failed evaluators on driver restart. + * An object that encapsulates the information needed to construct an + * {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager} for a recovered evaluator + * on restart. */ @Private @DriverSide @Unstable public final class EvaluatorRestartInfo { - private final Set<String> aliveEvaluators; - private final Set<String> failedEvaluators; + private final ResourceRecoverEvent resourceRecoverEvent; + private EvaluatorRestartState evaluatorRestartState; + + /** + * Creates an {@link EvaluatorRestartInfo} object that represents the information of an evaluator that is expected + * to recover. + */ + public static EvaluatorRestartInfo createExpectedEvaluatorInfo(final ResourceRecoverEvent resourceRecoverEvent) { + return new EvaluatorRestartInfo(resourceRecoverEvent, EvaluatorRestartState.EXPECTED); + } + + /** + * Creates an {@link EvaluatorRestartInfo} object that represents the information of an evaluator that + * has failed on driver restart. + */ + public static EvaluatorRestartInfo createFailedEvaluatorInfo(final String evaluatorId) { + final ResourceRecoverEvent resourceRecoverEvent = + ResourceEventImpl.newRecoveryBuilder().setIdentifier(evaluatorId).build(); - public EvaluatorRestartInfo(final Set<String> aliveEvaluators, final Set<String> failedEvaluators) { - this.aliveEvaluators = Collections.unmodifiableSet(aliveEvaluators); - this.failedEvaluators = Collections.unmodifiableSet(failedEvaluators); + return new EvaluatorRestartInfo(resourceRecoverEvent, EvaluatorRestartState.FAILED); } /** - * @return the set of evaluator IDs for alive evaluators on driver restart. The returned set is unmodifiable. + * @return the {@link ResourceRecoverEvent} that contains the information (e.g. resource MB, node ID, Evaluator ID...) + * needed to reconstruct the {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager} of the + * recovered evaluator on restart. */ - public Set<String> getAliveEvaluators() { - return this.aliveEvaluators; + public ResourceRecoverEvent getResourceRecoverEvent() { + return resourceRecoverEvent; } /** - * @return the set of evaluator IDs for faiuled evaluators on driver restart. The returned set is unmodifiable. + * @return the current process of the restart. */ - public Set<String> getFailedEvaluators() { - return this.failedEvaluators; + public EvaluatorRestartState getEvaluatorRestartState() { + return evaluatorRestartState; + } + + /** + * sets the current process of the restart. + */ + public boolean setEvaluatorRestartState(final EvaluatorRestartState to) { + if (EvaluatorRestartState.isLegalTransition(evaluatorRestartState, to)) { + this.evaluatorRestartState = to; + return true; + } + + return false; + } + + private EvaluatorRestartInfo(final ResourceRecoverEvent resourceRecoverEvent, + final EvaluatorRestartState evaluatorRestartState) { + this.resourceRecoverEvent = resourceRecoverEvent; + this.evaluatorRestartState = evaluatorRestartState; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java index 4a0c540..a87052b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java @@ -57,7 +57,12 @@ public enum EvaluatorRestartState { /** * The evaluator has only contacted the driver after the expiration period. */ - EXPIRED; + EXPIRED, + + /** + * The evaluator has failed on driver restart. + */ + FAILED; /** * @return true if the transition of {@link EvaluatorRestartState} is legal. @@ -103,4 +108,17 @@ public enum EvaluatorRestartState { return false; } } + + /** + * @return true if the evaluator has failed on driver restart or is not expected to report back to the driver. + */ + public boolean isFailedOrNotExpected() { + switch(this) { + case FAILED: + case NOT_EXPECTED: + return true; + default: + return false; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java new file mode 100644 index 0000000..c3f8857 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.driver.restart; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.util.BuilderUtils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Represents holds the set of Evaluator information needed to recover EvaluatorManagers + * on the restarted Driver. + */ +@Private +@DriverSide +@Unstable +public final class RestartEvaluators { + private final Map<String, EvaluatorRestartInfo> restartEvaluatorsMap; + + private RestartEvaluators(final Map<String, EvaluatorRestartInfo> restartEvaluatorsMap){ + this.restartEvaluatorsMap = BuilderUtils.notNull(restartEvaluatorsMap); + } + + /** + * @return true if Evaluator with evaluatorId can be an Evaluator from + * previous application attempts. + */ + boolean contains(final String evaluatorId) { + return restartEvaluatorsMap.containsKey(evaluatorId); + } + + /** + * @return The {@link EvaluatorRestartInfo} of an Evaluator from + * previous application attempts. + */ + EvaluatorRestartInfo get(final String evaluatorId) { + return restartEvaluatorsMap.get(evaluatorId); + } + + /** + * @return The set of Evaluator IDs of Evaluators from previous + * application attempts. + */ + Set<String> getEvaluatorIds() { + return restartEvaluatorsMap.keySet(); + } + + /** + * @return a new Builder to build an instance of {@link RestartEvaluators}. + */ + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder implements org.apache.reef.util.Builder<RestartEvaluators>{ + private final Map<String, EvaluatorRestartInfo> restartInfoMap = new HashMap<>(); + + private Builder(){ + } + + public boolean addRestartEvaluator(final EvaluatorRestartInfo evaluatorRestartInfo) { + if (evaluatorRestartInfo == null) { + return false; + } + + final String evaluatorId = evaluatorRestartInfo.getResourceRecoverEvent().getIdentifier(); + if (evaluatorId == null || restartInfoMap.containsKey(evaluatorId)) { + return false; + } + + restartInfoMap.put(evaluatorId, evaluatorRestartInfo); + return true; + } + + @Override + public RestartEvaluators build() { + return new RestartEvaluators(Collections.unmodifiableMap(restartInfoMap)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java index e507d90..5b0156e 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java @@ -20,8 +20,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.RuntimeAuthor; -import org.apache.reef.tang.annotations.DefaultImplementation; -import org.apache.reef.util.Optional; /** * Event from Driver Runtime -> Driver Process @@ -29,32 +27,5 @@ import org.apache.reef.util.Optional; */ @RuntimeAuthor @DriverSide -@DefaultImplementation(ResourceAllocationEventImpl.class) -public interface ResourceAllocationEvent { - - /** - * @return Id of the allocated resource - */ - String getIdentifier(); - - /** - * @return Memory size of the resource, in MB - */ - int getResourceMemory(); - - /** - * @return Id of the node where resource was allocated - */ - String getNodeId(); - - /** - * @return Number of virtual CPU cores on the resource - */ - Optional<Integer> getVirtualCores(); - - /** - * @return Rack name of the resource - */ - Optional<String> getRackName(); - -} +public interface ResourceAllocationEvent extends ResourceEvent { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java deleted file mode 100644 index f8c5ade..0000000 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.runtime.common.driver.resourcemanager; - -import org.apache.reef.util.BuilderUtils; -import org.apache.reef.util.Optional; - -/** - * Default POJO implementation of ResourceAllocationEvent. - * Use newBuilder to construct an instance. - */ -public final class ResourceAllocationEventImpl implements ResourceAllocationEvent { - private final String identifier; - private final int resourceMemory; - private final String nodeId; - private final Optional<Integer> virtualCores; - private final Optional<String> rackName; - - - private ResourceAllocationEventImpl(final Builder builder) { - this.identifier = BuilderUtils.notNull(builder.identifier); - this.resourceMemory = BuilderUtils.notNull(builder.resourceMemory); - this.nodeId = BuilderUtils.notNull(builder.nodeId); - this.virtualCores = Optional.ofNullable(builder.virtualCores); - this.rackName = Optional.ofNullable(builder.rackName); - } - - @Override - public String getIdentifier() { - return identifier; - } - - @Override - public int getResourceMemory() { - return resourceMemory; - } - - @Override - public String getNodeId() { - return nodeId; - } - - @Override - public Optional<Integer> getVirtualCores() { - return virtualCores; - } - - @Override - public Optional<String> getRackName() { - return rackName; - } - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder used to create ResourceAllocationEvent instances. - */ - public static final class Builder implements org.apache.reef.util.Builder<ResourceAllocationEvent> { - private String identifier; - private Integer resourceMemory; - private String nodeId; - private Integer virtualCores; - private String rackName; - - - /** - * @see ResourceAllocationEvent#getIdentifier() - */ - public Builder setIdentifier(final String identifier) { - this.identifier = identifier; - return this; - } - - /** - * @see ResourceAllocationEvent#getResourceMemory() - */ - public Builder setResourceMemory(final int resourceMemory) { - this.resourceMemory = resourceMemory; - return this; - } - - /** - * @see ResourceAllocationEvent#getNodeId() - */ - public Builder setNodeId(final String nodeId) { - this.nodeId = nodeId; - return this; - } - - /** - * @see ResourceAllocationEvent#getVirtualCores() - */ - public Builder setVirtualCores(final int virtualCores) { - this.virtualCores = virtualCores; - return this; - } - - /** - * @see ResourceAllocationEvent#getRackName() - */ - public Builder setRackName(final String rackName) { - this.rackName = rackName; - return this; - } - - @Override - public ResourceAllocationEvent build() { - return new ResourceAllocationEventImpl(this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java new file mode 100644 index 0000000..baae87e --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.util.Optional; + +/** + * An interface capturing the characteristics of a resource event. + */ +@DriverSide +@Private +public interface ResourceEvent { + + /** + * @return Id of the resource + */ + String getIdentifier(); + + /** + * @return Memory size of the resource, in MB + */ + int getResourceMemory(); + + /** + * @return Id of the node where resource is + */ + String getNodeId(); + + /** + * @return Number of virtual CPU cores on the resource + */ + Optional<Integer> getVirtualCores(); + + /** + * @return Rack name of the resource + */ + Optional<String> getRackName(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java new file mode 100644 index 0000000..8d43be6 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.util.BuilderUtils; +import org.apache.reef.util.Optional; + +/** + * Default POJO implementation of ResourceAllocationEvent and ResourceRecoverEvent. + * Use newAllocationBuilder to construct an instance for ResourceAllocationEvent and + * use newRecoveryBuilder to construct an instance for ResourceRecoverEvent. + */ +public final class ResourceEventImpl implements ResourceAllocationEvent, ResourceRecoverEvent { + private final String identifier; + private final int resourceMemory; + private final String nodeId; + private final Optional<Integer> virtualCores; + private final Optional<String> rackName; + + + private ResourceEventImpl(final Builder builder) { + this.identifier = BuilderUtils.notNull(builder.identifier); + this.resourceMemory = builder.recovery ? builder.resourceMemory : BuilderUtils.notNull(builder.resourceMemory); + this.nodeId = builder.recovery ? builder.nodeId : BuilderUtils.notNull(builder.nodeId); + this.virtualCores = Optional.ofNullable(builder.virtualCores); + this.rackName = Optional.ofNullable(builder.rackName); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public int getResourceMemory() { + return resourceMemory; + } + + @Override + public String getNodeId() { + return nodeId; + } + + @Override + public Optional<Integer> getVirtualCores() { + return virtualCores; + } + + @Override + public Optional<String> getRackName() { + return rackName; + } + + public static Builder newAllocationBuilder() { + return new Builder(false); + } + + public static Builder newRecoveryBuilder() { + return new Builder(true); + } + + /** + * Builder used to create ResourceAllocationEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<ResourceEventImpl> { + private final boolean recovery; + + private String identifier; + private Integer resourceMemory; + private String nodeId; + private Integer virtualCores; + private String rackName; + + private Builder(final boolean recovery){ + this.recovery = recovery; + } + + /** + * @see ResourceAllocationEvent#getIdentifier() + */ + public Builder setIdentifier(final String identifier) { + this.identifier = identifier; + return this; + } + + /** + * @see ResourceAllocationEvent#getResourceMemory() + */ + public Builder setResourceMemory(final int resourceMemory) { + this.resourceMemory = resourceMemory; + return this; + } + + /** + * @see ResourceAllocationEvent#getNodeId() + */ + public Builder setNodeId(final String nodeId) { + this.nodeId = nodeId; + return this; + } + + /** + * @see ResourceAllocationEvent#getVirtualCores() + */ + public Builder setVirtualCores(final int virtualCores) { + this.virtualCores = virtualCores; + return this; + } + + /** + * @see ResourceAllocationEvent#getRackName() + */ + public Builder setRackName(final String rackName) { + this.rackName = rackName; + return this; + } + + @Override + public ResourceEventImpl build() { + return new ResourceEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java new file mode 100644 index 0000000..0f90c9a --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.RuntimeAuthor; + +/** + * A Resource recovered by the {@link org.apache.reef.driver.restart.DriverRestartManager}. + */ +@RuntimeAuthor +@DriverSide +public interface ResourceRecoverEvent extends ResourceEvent { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java index ba498e2..b1f0097 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java @@ -27,7 +27,7 @@ import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.RuntimeParameters; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; -import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; import org.apache.reef.runtime.common.files.FileResource; @@ -217,7 +217,7 @@ public final class ResourceManager { requestQueue.satisfyOne(); final Container container = cont.get(); // Tell the receivers about it - final ResourceAllocationEvent alloc = ResourceAllocationEventImpl.newBuilder() + final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder() .setIdentifier(container.getContainerID()).setNodeId(container.getNodeID()) .setResourceMemory(container.getMemory()).setVirtualCores(container.getNumberOfCores()) .setRackName(container.getRackName()).build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java index a363e6a..533454b 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java @@ -28,7 +28,7 @@ import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl; import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; -import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; @@ -384,7 +384,7 @@ final class REEFScheduler implements Scheduler { this.executors.add(taskStatus.getTaskId().getValue(), resourceRequestProto.getMemorySize().get(), evaluatorControlHandler); - final ResourceAllocationEvent alloc = ResourceAllocationEventImpl.newBuilder() + final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder() .setIdentifier(taskStatus.getTaskId().getValue()) .setNodeId(taskStatus.getSlaveId().getValue()) .setResourceMemory(resourceRequestProto.getMemorySize().get()) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index 27e5662..0e594ac 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -33,7 +33,7 @@ import org.apache.reef.exception.DriverFatalRuntimeException; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.DriverStatusManager; import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; -import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; @@ -408,7 +408,7 @@ final class YarnContainerManager LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = {1}", new Object[]{container.getResource().getMemory(), container.getResource().getVirtualCores()}); - this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder() + this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder() .setIdentifier(container.getId().toString()) .setNodeId(container.getNodeId().toString()) .setResourceMemory(container.getResource().getMemory()) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java index fbf64f8..495a777 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java @@ -29,8 +29,10 @@ import org.apache.reef.annotations.audience.Private; import org.apache.reef.annotations.audience.RuntimeAuthor; import org.apache.reef.driver.restart.DriverRuntimeRestartManager; import org.apache.reef.driver.restart.EvaluatorRestartInfo; +import org.apache.reef.driver.restart.RestartEvaluators; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.EvaluatorPreserver; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver; import org.apache.reef.tang.annotations.Parameter; @@ -56,6 +58,7 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta private final ApplicationMasterRegistration registration; private final REEFEventHandlers reefEventHandlers; private final YarnContainerManager yarnContainerManager; + private final RackNameFormatter rackNameFormatter; private Set<Container> previousContainers; @@ -64,11 +67,13 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta final EvaluatorPreserver evaluatorPreserver, final REEFEventHandlers reefEventHandlers, final ApplicationMasterRegistration registration, - final YarnContainerManager yarnContainerManager) { + final YarnContainerManager yarnContainerManager, + final RackNameFormatter rackNameFormatter) { this.registration = registration; this.evaluatorPreserver = evaluatorPreserver; this.reefEventHandlers = reefEventHandlers; this.yarnContainerManager = yarnContainerManager; + this.rackNameFormatter = rackNameFormatter; this.previousContainers = null; } @@ -161,15 +166,16 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta } /** - * Used by tDriverRestartManager. Gets the list of previous containers from the resource manager, + * Used by {@link org.apache.reef.driver.restart.DriverRestartManager}. + * Gets the list of previous containers from the resource manager, * compares that list to the YarnDriverRuntimeRestartManager's own list based on the evalutor preserver, * and determine which evaluators are alive and which have failed during restart. - * @return EvaluatorRestartInfo, the object encapsulating alive and failed evaluator IDs. + * @return a map of Evaluator ID to {@link EvaluatorRestartInfo} for evaluators that have either failed or survived + * driver restart. */ @Override - public EvaluatorRestartInfo getAliveAndFailedEvaluators() { - final Set<String> recoveredEvaluators = new HashSet<>(); - final Set<String> failedEvaluators = new HashSet<>(); + public RestartEvaluators getPreviousEvaluators() { + final RestartEvaluators.Builder restartEvaluatorsBuilder = RestartEvaluators.newBuilder(); this.initializeListOfPreviousContainers(); @@ -191,7 +197,8 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta if (!previousContainersIds.contains(expectedContainerId)) { LOG.log(Level.WARNING, "Expected container [{0}] not alive, must have failed during driver restart.", expectedContainerId); - failedEvaluators.add(expectedContainerId); + restartEvaluatorsBuilder.addRestartEvaluator( + EvaluatorRestartInfo.createFailedEvaluatorInfo(expectedContainerId)); } } } @@ -208,11 +215,15 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta throw new RuntimeException("Not expecting container " + container.getId().toString()); } - recoveredEvaluators.add(container.getId().toString()); + restartEvaluatorsBuilder.addRestartEvaluator(EvaluatorRestartInfo.createExpectedEvaluatorInfo( + ResourceEventImpl.newRecoveryBuilder().setIdentifier(container.getId().toString()) + .setNodeId(container.getNodeId().toString()).setRackName(rackNameFormatter.getRackName(container)) + .setResourceMemory(container.getResource().getMemory()) + .setVirtualCores(container.getResource().getVirtualCores()).build())); } } - return new EvaluatorRestartInfo(recoveredEvaluators, failedEvaluators); + return restartEvaluatorsBuilder.build(); } /** @@ -232,7 +243,6 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta .setState(ReefServiceProtos.State.FAILED) .setExitCode(1) .setDiagnostics("Container [" + evaluatorId + "] failed during driver restart process.") - .setIsFromPreviousDriver(true) .build()); } }