Repository: incubator-reef
Updated Branches:
  refs/heads/master 9b16d54cd -> 8efffd910


[REEF-634] Capture Restarted Evaluators in a POJO

This addressed the issue by
  * Creating ResourceRecoverEvent and changing EvaluatorRestartInfo to
    save the state of (potentially) restarted Evaluator.
  * Creating the interface ResourceEvent and have ResourceAllocationEvent
    and ResourceRecoverEvent extend it.
  * Have DriverRestartManager remember the status and other information
    of the evaluators to recover/fail on restart.

JIRA:
  [REEF-634](https://issues.apache.org/jira/browse/REEF-634)

Pull Request:
  This closes #408


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8efffd91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8efffd91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8efffd91

Branch: refs/heads/master
Commit: 8efffd910e21f390a2ed08d4d9a84ce8ce9c2e51
Parents: 9b16d54
Author: Andrew Chung <afchun...@gmail.com>
Authored: Fri Aug 21 18:25:24 2015 -0700
Committer: Markus Weimer <wei...@apache.org>
Committed: Tue Aug 25 17:51:55 2015 -0700

----------------------------------------------------------------------
 .../DefaultDriverRuntimeRestartMangerImpl.java  |   6 +-
 .../driver/restart/DriverRestartManager.java    | 108 +++++++-------
 .../restart/DriverRuntimeRestartManager.java    |   4 +-
 .../driver/restart/EvaluatorRestartInfo.java    |  67 ++++++---
 .../driver/restart/EvaluatorRestartState.java   |  20 ++-
 .../reef/driver/restart/RestartEvaluators.java  | 101 ++++++++++++++
 .../ResourceAllocationEvent.java                |  33 +----
 .../ResourceAllocationEventImpl.java            | 129 -----------------
 .../driver/resourcemanager/ResourceEvent.java   |  57 ++++++++
 .../resourcemanager/ResourceEventImpl.java      | 139 +++++++++++++++++++
 .../resourcemanager/ResourceRecoverEvent.java   |  30 ++++
 .../runtime/local/driver/ResourceManager.java   |   4 +-
 .../runtime/mesos/driver/REEFScheduler.java     |   4 +-
 .../yarn/driver/YarnContainerManager.java       |   4 +-
 .../driver/YarnDriverRuntimeRestartManager.java |  30 ++--
 15 files changed, 480 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
index 12934f7..277b872 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
@@ -57,9 +57,9 @@ final class DefaultDriverRuntimeRestartMangerImpl implements 
DriverRuntimeRestar
   }
 
   @Override
-  public EvaluatorRestartInfo getAliveAndFailedEvaluators() {
+  public RestartEvaluators getPreviousEvaluators() {
     throw new DriverFatalRuntimeException(
-        "Restart is not enabled. getAliveAndFailedEvaluators should not have 
been called.");
+        "Restart is not enabled. getPreviousEvaluators should not have been 
called.");
   }
 
   @Override
@@ -67,4 +67,4 @@ final class DefaultDriverRuntimeRestartMangerImpl implements 
DriverRuntimeRestar
     throw new DriverFatalRuntimeException(
         "Restart is not enabled. informAboutEvaluatorFailures should not have 
been called.");
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 3e1be1f..d6b823c 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -38,7 +38,8 @@ import java.util.logging.Logger;
 public final class DriverRestartManager {
   private static final Logger LOG = 
Logger.getLogger(DriverRestartManager.class.getName());
   private final DriverRuntimeRestartManager driverRuntimeRestartManager;
-  private final Map<String, EvaluatorRestartState> previousEvaluators = new 
HashMap<>();
+
+  private RestartEvaluators restartEvaluators;
   private DriverRestartState state = DriverRestartState.NotRestarted;
 
   @Inject
@@ -81,9 +82,17 @@ public final class DriverRestartManager {
    * as alive to the job driver.
    */
   public synchronized void onRestart() {
-    final EvaluatorRestartInfo evaluatorRestartInfo = 
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
-    setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
-    
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
+    if (this.state == DriverRestartState.RestartBegan) {
+      restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators();
+      this.state = DriverRestartState.RestartInProgress;
+    } else {
+      final String errMsg = "Should not be setting the set of expected alive 
evaluators more than once.";
+      LOG.log(Level.SEVERE, errMsg);
+      throw new DriverFatalRuntimeException(errMsg);
+    }
+
+    
driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators());
+
     // TODO[REEF-560]: Call onDriverRestartCompleted() (to do in REEF-617) on 
a Timer.
   }
 
@@ -92,32 +101,11 @@ public final class DriverRestartManager {
    * if the {@link DriverRestartManager} does not believe that it's an 
evaluator to be recovered.
    */
   public synchronized EvaluatorRestartState getEvaluatorRestartState(final 
String evaluatorId) {
-    if (this.state.hasNotRestarted() ||
-        !this.previousEvaluators.containsKey(evaluatorId)) {
+    if (this.state.hasNotRestarted()) {
       return EvaluatorRestartState.NOT_EXPECTED;
     }
 
-    return this.previousEvaluators.get(evaluatorId);
-  }
-
-  /**
-   * Set the Evaluators to expect still active from a previous execution of 
the Driver in a restart situation.
-   * To be called exactly once during a driver restart.
-   *
-   * @param previousEvaluatorIds the evaluator IDs of the evaluators that are 
expected to have survived driver restart.
-   */
-  private synchronized void setPreviousEvaluatorIds(final Set<String> 
previousEvaluatorIds) {
-    if (this.state == DriverRestartState.RestartBegan) {
-      for (final String previousEvaluatorId : previousEvaluatorIds) {
-        setEvaluatorExpected(previousEvaluatorId);
-      }
-
-      this.state = DriverRestartState.RestartInProgress;
-    } else {
-      final String errMsg = "Should not be setting the set of expected alive 
evaluators more than once.";
-      LOG.log(Level.SEVERE, errMsg);
-      throw new DriverFatalRuntimeException(errMsg);
-    }
+    return getStateOfPreviousEvaluator(evaluatorId);
   }
 
   /**
@@ -126,14 +114,13 @@ public final class DriverRestartManager {
    * @return true if the driver restart is completed.
    */
   public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String 
evaluatorId) {
-    if (!this.previousEvaluators.containsKey(evaluatorId) ||
-        this.previousEvaluators.get(evaluatorId) == 
EvaluatorRestartState.NOT_EXPECTED) {
+    if (getStateOfPreviousEvaluator(evaluatorId) == 
EvaluatorRestartState.NOT_EXPECTED) {
       final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " 
not expected to be alive.";
       LOG.log(Level.SEVERE, errMsg);
       throw new DriverFatalRuntimeException(errMsg);
     }
 
-    if (this.previousEvaluators.get(evaluatorId) != 
EvaluatorRestartState.EXPECTED) {
+    if (getStateOfPreviousEvaluator(evaluatorId) != 
EvaluatorRestartState.EXPECTED) {
       LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " 
added to the set" +
           " of recovered evaluators more than once. Ignoring second add...");
     } else {
@@ -160,74 +147,79 @@ public final class DriverRestartManager {
   }
 
   /**
-   * Signals to the {@link DriverRestartManager} that an evaluator is to be 
expected to report back after restart.
-   */
-  public synchronized void setEvaluatorExpected(final String evaluatorId) {
-    if (previousEvaluators.containsKey(evaluatorId)) {
-      LOG.log(Level.WARNING, "Evaluator " + evaluatorId + " is already added 
to the set of previous evaluators with " +
-          "state [" + previousEvaluators.get(evaluatorId) + "]. Ignoring...");
-      return;
-    }
-
-    previousEvaluators.put(evaluatorId, EvaluatorRestartState.EXPECTED);
-  }
-
-  /**
    * Signals to the {@link DriverRestartManager} that an evaluator has 
reported back after restart.
    */
   public synchronized void setEvaluatorReported(final String evaluatorId) {
-    setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REPORTED);
+    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REPORTED);
   }
 
   /**
    * Signals to the {@link DriverRestartManager} that an evaluator has had its 
recovery heartbeat processed.
    */
   public synchronized void setEvaluatorReregistered(final String evaluatorId) {
-    setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REREGISTERED);
+    setStateOfPreviousEvaluator(evaluatorId, 
EvaluatorRestartState.REREGISTERED);
   }
 
   /**
    * Signals to the {@link DriverRestartManager} that an evaluator has had its 
running task processed.
    */
   public synchronized void setEvaluatorRunningTask(final String evaluatorId) {
-    setPreviousEvaluatorState(
-        evaluatorId, EvaluatorRestartState.PROCESSED);
+    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED);
   }
 
   /**
    * Signals to the {@link DriverRestartManager} that an expected evaluator 
has been expired.
    */
   public synchronized void setEvaluatorExpired(final String evaluatorId) {
-    setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.EXPIRED);
+    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.EXPIRED);
   }
 
-  private synchronized void setPreviousEvaluatorState(final String evaluatorId,
-                                                      final 
EvaluatorRestartState to) {
-    if (!previousEvaluators.containsKey(evaluatorId) ||
-        
!EvaluatorRestartState.isLegalTransition(previousEvaluators.get(evaluatorId), 
to)) {
-      throw evaluatorTransitionFailed(evaluatorId, to);
+  private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final 
String evaluatorId) {
+    if (this.restartEvaluators.contains(evaluatorId)) {
+      return EvaluatorRestartState.NOT_EXPECTED;
     }
 
-    previousEvaluators.put(evaluatorId, to);
+    return this.restartEvaluators.get(evaluatorId).getEvaluatorRestartState();
+  }
+
+  private synchronized void setStateOfPreviousEvaluator(final String 
evaluatorId,
+                                                        final 
EvaluatorRestartState to) {
+    if (!restartEvaluators.contains(evaluatorId) ||
+        !restartEvaluators.get(evaluatorId).setEvaluatorRestartState(to)) {
+      throw evaluatorTransitionFailed(evaluatorId, to);
+    }
   }
 
   private synchronized DriverFatalRuntimeException 
evaluatorTransitionFailed(final String evaluatorId,
                                                                              
final EvaluatorRestartState to) {
-    if (!previousEvaluators.containsKey(evaluatorId)) {
+    if (!restartEvaluators.contains(evaluatorId)) {
       return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is 
not expected.");
     }
 
     return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " 
wants to transition to state " +
-        "[" + to + "], but is in the illegal state [" + 
previousEvaluators.get(evaluatorId) + "].");
+        "[" + to + "], but is in the illegal state [" +
+        restartEvaluators.get(evaluatorId).getEvaluatorRestartState() + "].");
   }
 
   private synchronized boolean haveAllExpectedEvaluatorsReported() {
-    for (final EvaluatorRestartState evaluatorRestartState : 
this.previousEvaluators.values()) {
-      if (!evaluatorRestartState.hasReported()) {
+    for (final String previousEvaluatorId : 
this.restartEvaluators.getEvaluatorIds()) {
+      final EvaluatorRestartState restartState = 
getStateOfPreviousEvaluator(previousEvaluatorId);
+      if (restartState == EvaluatorRestartState.EXPECTED) {
         return false;
       }
     }
 
     return true;
   }
+
+  private Set<String> getFailedEvaluators() {
+    final Set<String> failed = new HashSet<>();
+    for (final String previousEvaluatorId : 
this.restartEvaluators.getEvaluatorIds()) {
+      if (getStateOfPreviousEvaluator(previousEvaluatorId) == 
EvaluatorRestartState.FAILED) {
+        failed.add(previousEvaluatorId);
+      }
+    }
+
+    return failed;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
index 4e38b4b..5e1acec 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
@@ -58,9 +58,9 @@ public interface DriverRuntimeRestartManager {
 
   /**
    * Gets the sets of alive and failed evaluators based on the runtime 
implementation.
-   * @return EvaluatorRestartInfo, which encapsulates the alive and failed set 
of evaluator IDs.
+   * @return A map which encapsulates the states of previous evaluators.
    */
-  EvaluatorRestartInfo getAliveAndFailedEvaluators();
+  RestartEvaluators getPreviousEvaluators();
 
   /**
    * Informs the necessary components about failed evaluators. The 
implementation is runtime dependent.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
index 10beb78..e058c9a 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
@@ -21,36 +21,71 @@ package org.apache.reef.driver.restart;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-
-import java.util.Collections;
-import java.util.Set;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
 
 /**
- * The encapsulating class for alive and failed evaluators on driver restart.
+ * An object that encapsulates the information needed to construct an
+ * {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager} 
for a recovered evaluator
+ * on restart.
  */
 @Private
 @DriverSide
 @Unstable
 public final class EvaluatorRestartInfo {
-  private final Set<String> aliveEvaluators;
-  private final Set<String> failedEvaluators;
+  private final ResourceRecoverEvent resourceRecoverEvent;
+  private EvaluatorRestartState evaluatorRestartState;
+
+  /**
+   * Creates an {@link EvaluatorRestartInfo} object that represents the 
information of an evaluator that is expected
+   * to recover.
+   */
+  public static EvaluatorRestartInfo createExpectedEvaluatorInfo(final 
ResourceRecoverEvent resourceRecoverEvent) {
+    return new EvaluatorRestartInfo(resourceRecoverEvent, 
EvaluatorRestartState.EXPECTED);
+  }
+
+  /**
+   * Creates an {@link EvaluatorRestartInfo} object that represents the 
information of an evaluator that
+   * has failed on driver restart.
+   */
+  public static EvaluatorRestartInfo createFailedEvaluatorInfo(final String 
evaluatorId) {
+    final ResourceRecoverEvent resourceRecoverEvent =
+        
ResourceEventImpl.newRecoveryBuilder().setIdentifier(evaluatorId).build();
 
-  public EvaluatorRestartInfo(final Set<String> aliveEvaluators, final 
Set<String> failedEvaluators) {
-    this.aliveEvaluators = Collections.unmodifiableSet(aliveEvaluators);
-    this.failedEvaluators = Collections.unmodifiableSet(failedEvaluators);
+    return new EvaluatorRestartInfo(resourceRecoverEvent, 
EvaluatorRestartState.FAILED);
   }
 
   /**
-   * @return the set of evaluator IDs for alive evaluators on driver restart. 
The returned set is unmodifiable.
+   * @return the {@link ResourceRecoverEvent} that contains the information 
(e.g. resource MB, node ID, Evaluator ID...)
+   * needed to reconstruct the {@link 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager} of the
+   * recovered evaluator on restart.
    */
-  public Set<String> getAliveEvaluators() {
-    return this.aliveEvaluators;
+  public ResourceRecoverEvent getResourceRecoverEvent() {
+    return resourceRecoverEvent;
   }
 
   /**
-   * @return the set of evaluator IDs for faiuled evaluators on driver 
restart. The returned set is unmodifiable.
+   * @return the current process of the restart.
    */
-  public Set<String> getFailedEvaluators() {
-    return this.failedEvaluators;
+  public EvaluatorRestartState getEvaluatorRestartState() {
+    return evaluatorRestartState;
+  }
+
+  /**
+   * sets the current process of the restart.
+   */
+  public boolean setEvaluatorRestartState(final EvaluatorRestartState to) {
+    if (EvaluatorRestartState.isLegalTransition(evaluatorRestartState, to)) {
+      this.evaluatorRestartState = to;
+      return true;
+    }
+
+    return false;
+  }
+
+  private EvaluatorRestartInfo(final ResourceRecoverEvent resourceRecoverEvent,
+                               final EvaluatorRestartState 
evaluatorRestartState) {
+    this.resourceRecoverEvent = resourceRecoverEvent;
+    this.evaluatorRestartState = evaluatorRestartState;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
index 4a0c540..a87052b 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
@@ -57,7 +57,12 @@ public enum EvaluatorRestartState {
   /**
    * The evaluator has only contacted the driver after the expiration period.
    */
-  EXPIRED;
+  EXPIRED,
+
+  /**
+   * The evaluator has failed on driver restart.
+   */
+  FAILED;
 
   /**
    * @return true if the transition of {@link EvaluatorRestartState} is legal.
@@ -103,4 +108,17 @@ public enum EvaluatorRestartState {
       return false;
     }
   }
+
+  /**
+   * @return true if the evaluator has failed on driver restart or is not 
expected to report back to the driver.
+   */
+  public boolean isFailedOrNotExpected() {
+    switch(this) {
+    case FAILED:
+    case NOT_EXPECTED:
+      return true;
+    default:
+      return false;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java
new file mode 100644
index 0000000..c3f8857
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/RestartEvaluators.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.BuilderUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Represents holds the set of Evaluator information needed to recover 
EvaluatorManagers
+ * on the restarted Driver.
+ */
+@Private
+@DriverSide
+@Unstable
+public final class RestartEvaluators {
+  private final Map<String, EvaluatorRestartInfo> restartEvaluatorsMap;
+
+  private RestartEvaluators(final Map<String, EvaluatorRestartInfo> 
restartEvaluatorsMap){
+    this.restartEvaluatorsMap = BuilderUtils.notNull(restartEvaluatorsMap);
+  }
+
+  /**
+   * @return true if Evaluator with evaluatorId can be an Evaluator from
+   * previous application attempts.
+   */
+  boolean contains(final String evaluatorId) {
+    return restartEvaluatorsMap.containsKey(evaluatorId);
+  }
+
+  /**
+   * @return The {@link EvaluatorRestartInfo} of an Evaluator from
+   * previous application attempts.
+   */
+  EvaluatorRestartInfo get(final String evaluatorId) {
+    return restartEvaluatorsMap.get(evaluatorId);
+  }
+
+  /**
+   * @return The set of Evaluator IDs of Evaluators from previous
+   * application attempts.
+   */
+  Set<String> getEvaluatorIds() {
+    return restartEvaluatorsMap.keySet();
+  }
+
+  /**
+   * @return a new Builder to build an instance of {@link RestartEvaluators}.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static final class Builder implements 
org.apache.reef.util.Builder<RestartEvaluators>{
+    private final Map<String, EvaluatorRestartInfo> restartInfoMap = new 
HashMap<>();
+
+    private Builder(){
+    }
+
+    public boolean addRestartEvaluator(final EvaluatorRestartInfo 
evaluatorRestartInfo) {
+      if (evaluatorRestartInfo == null) {
+        return false;
+      }
+
+      final String evaluatorId = 
evaluatorRestartInfo.getResourceRecoverEvent().getIdentifier();
+      if (evaluatorId == null || restartInfoMap.containsKey(evaluatorId)) {
+        return false;
+      }
+
+      restartInfoMap.put(evaluatorId, evaluatorRestartInfo);
+      return true;
+    }
+
+    @Override
+    public RestartEvaluators build() {
+      return new 
RestartEvaluators(Collections.unmodifiableMap(restartInfoMap));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
index e507d90..5b0156e 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java
@@ -20,8 +20,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.RuntimeAuthor;
-import org.apache.reef.tang.annotations.DefaultImplementation;
-import org.apache.reef.util.Optional;
 
 /**
  * Event from Driver Runtime -> Driver Process
@@ -29,32 +27,5 @@ import org.apache.reef.util.Optional;
  */
 @RuntimeAuthor
 @DriverSide
-@DefaultImplementation(ResourceAllocationEventImpl.class)
-public interface ResourceAllocationEvent {
-
-  /**
-   * @return Id of the allocated resource
-   */
-  String getIdentifier();
-
-  /**
-   * @return Memory size of the resource, in MB
-   */
-  int getResourceMemory();
-
-  /**
-   * @return Id of the node where resource was allocated
-   */
-  String getNodeId();
-
-  /**
-   * @return Number of virtual CPU cores on the resource
-   */
-  Optional<Integer> getVirtualCores();
-
-  /**
-   * @return Rack name of the resource
-   */
-  Optional<String> getRackName();
-
-}
+public interface ResourceAllocationEvent extends ResourceEvent {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java
deleted file mode 100644
index f8c5ade..0000000
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.common.driver.resourcemanager;
-
-import org.apache.reef.util.BuilderUtils;
-import org.apache.reef.util.Optional;
-
-/**
- * Default POJO implementation of ResourceAllocationEvent.
- * Use newBuilder to construct an instance.
- */
-public final class ResourceAllocationEventImpl implements 
ResourceAllocationEvent {
-  private final String identifier;
-  private final int resourceMemory;
-  private final String nodeId;
-  private final Optional<Integer> virtualCores;
-  private final Optional<String> rackName;
-
-
-  private ResourceAllocationEventImpl(final Builder builder) {
-    this.identifier = BuilderUtils.notNull(builder.identifier);
-    this.resourceMemory = BuilderUtils.notNull(builder.resourceMemory);
-    this.nodeId = BuilderUtils.notNull(builder.nodeId);
-    this.virtualCores = Optional.ofNullable(builder.virtualCores);
-    this.rackName = Optional.ofNullable(builder.rackName);
-  }
-
-  @Override
-  public String getIdentifier() {
-    return identifier;
-  }
-
-  @Override
-  public int getResourceMemory() {
-    return resourceMemory;
-  }
-
-  @Override
-  public String getNodeId() {
-    return nodeId;
-  }
-
-  @Override
-  public Optional<Integer> getVirtualCores() {
-    return virtualCores;
-  }
-
-  @Override
-  public Optional<String> getRackName() {
-    return rackName;
-  }
-
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Builder used to create ResourceAllocationEvent instances.
-   */
-  public static final class Builder implements 
org.apache.reef.util.Builder<ResourceAllocationEvent> {
-    private String identifier;
-    private Integer resourceMemory;
-    private String nodeId;
-    private Integer virtualCores;
-    private String rackName;
-
-
-    /**
-     * @see ResourceAllocationEvent#getIdentifier()
-     */
-    public Builder setIdentifier(final String identifier) {
-      this.identifier = identifier;
-      return this;
-    }
-
-    /**
-     * @see ResourceAllocationEvent#getResourceMemory()
-     */
-    public Builder setResourceMemory(final int resourceMemory) {
-      this.resourceMemory = resourceMemory;
-      return this;
-    }
-
-    /**
-     * @see ResourceAllocationEvent#getNodeId()
-     */
-    public Builder setNodeId(final String nodeId) {
-      this.nodeId = nodeId;
-      return this;
-    }
-
-    /**
-     * @see ResourceAllocationEvent#getVirtualCores()
-     */
-    public Builder setVirtualCores(final int virtualCores) {
-      this.virtualCores = virtualCores;
-      return this;
-    }
-
-    /**
-     * @see ResourceAllocationEvent#getRackName()
-     */
-    public Builder setRackName(final String rackName) {
-      this.rackName = rackName;
-      return this;
-    }
-
-    @Override
-    public ResourceAllocationEvent build() {
-      return new ResourceAllocationEventImpl(this);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java
new file mode 100644
index 0000000..baae87e
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.Optional;
+
+/**
+ * An interface capturing the characteristics of a resource event.
+ */
+@DriverSide
+@Private
+public interface ResourceEvent {
+
+  /**
+   * @return Id of the resource
+   */
+  String getIdentifier();
+
+  /**
+   * @return Memory size of the resource, in MB
+   */
+  int getResourceMemory();
+
+  /**
+   * @return Id of the node where resource is
+   */
+  String getNodeId();
+
+  /**
+   * @return Number of virtual CPU cores on the resource
+   */
+  Optional<Integer> getVirtualCores();
+
+  /**
+   * @return Rack name of the resource
+   */
+  Optional<String> getRackName();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java
new file mode 100644
index 0000000..8d43be6
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceEventImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.util.BuilderUtils;
+import org.apache.reef.util.Optional;
+
+/**
+ * Default POJO implementation of ResourceAllocationEvent and 
ResourceRecoverEvent.
+ * Use newAllocationBuilder to construct an instance for 
ResourceAllocationEvent and
+ * use newRecoveryBuilder to construct an instance for ResourceRecoverEvent.
+ */
+public final class ResourceEventImpl implements ResourceAllocationEvent, 
ResourceRecoverEvent {
+  private final String identifier;
+  private final int resourceMemory;
+  private final String nodeId;
+  private final Optional<Integer> virtualCores;
+  private final Optional<String> rackName;
+
+
+  private ResourceEventImpl(final Builder builder) {
+    this.identifier = BuilderUtils.notNull(builder.identifier);
+    this.resourceMemory = builder.recovery ? builder.resourceMemory : 
BuilderUtils.notNull(builder.resourceMemory);
+    this.nodeId = builder.recovery ? builder.nodeId : 
BuilderUtils.notNull(builder.nodeId);
+    this.virtualCores = Optional.ofNullable(builder.virtualCores);
+    this.rackName = Optional.ofNullable(builder.rackName);
+  }
+
+  @Override
+  public String getIdentifier() {
+    return identifier;
+  }
+
+  @Override
+  public int getResourceMemory() {
+    return resourceMemory;
+  }
+
+  @Override
+  public String getNodeId() {
+    return nodeId;
+  }
+
+  @Override
+  public Optional<Integer> getVirtualCores() {
+    return virtualCores;
+  }
+
+  @Override
+  public Optional<String> getRackName() {
+    return rackName;
+  }
+
+  public static Builder newAllocationBuilder() {
+    return new Builder(false);
+  }
+
+  public static Builder newRecoveryBuilder() {
+    return new Builder(true);
+  }
+
+  /**
+   * Builder used to create ResourceAllocationEvent instances.
+   */
+  public static final class Builder implements 
org.apache.reef.util.Builder<ResourceEventImpl> {
+    private final boolean recovery;
+
+    private String identifier;
+    private Integer resourceMemory;
+    private String nodeId;
+    private Integer virtualCores;
+    private String rackName;
+
+    private Builder(final boolean recovery){
+      this.recovery = recovery;
+    }
+
+    /**
+     * @see ResourceAllocationEvent#getIdentifier()
+     */
+    public Builder setIdentifier(final String identifier) {
+      this.identifier = identifier;
+      return this;
+    }
+
+    /**
+     * @see ResourceAllocationEvent#getResourceMemory()
+     */
+    public Builder setResourceMemory(final int resourceMemory) {
+      this.resourceMemory = resourceMemory;
+      return this;
+    }
+
+    /**
+     * @see ResourceAllocationEvent#getNodeId()
+     */
+    public Builder setNodeId(final String nodeId) {
+      this.nodeId = nodeId;
+      return this;
+    }
+
+    /**
+     * @see ResourceAllocationEvent#getVirtualCores()
+     */
+    public Builder setVirtualCores(final int virtualCores) {
+      this.virtualCores = virtualCores;
+      return this;
+    }
+
+    /**
+     * @see ResourceAllocationEvent#getRackName()
+     */
+    public Builder setRackName(final String rackName) {
+      this.rackName = rackName;
+      return this;
+    }
+
+    @Override
+    public ResourceEventImpl build() {
+      return new ResourceEventImpl(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java
new file mode 100644
index 0000000..0f90c9a
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceRecoverEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+/**
+ * A Resource recovered by the {@link 
org.apache.reef.driver.restart.DriverRestartManager}.
+ */
+@RuntimeAuthor
+@DriverSide
+public interface ResourceRecoverEvent extends ResourceEvent {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
index ba498e2..b1f0097 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
@@ -27,7 +27,7 @@ import 
org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
 import org.apache.reef.runtime.common.files.FileResource;
@@ -217,7 +217,7 @@ public final class ResourceManager {
         requestQueue.satisfyOne();
         final Container container = cont.get();
         // Tell the receivers about it
-        final ResourceAllocationEvent alloc = 
ResourceAllocationEventImpl.newBuilder()
+        final ResourceAllocationEvent alloc = 
ResourceEventImpl.newAllocationBuilder()
             
.setIdentifier(container.getContainerID()).setNodeId(container.getNodeID())
             
.setResourceMemory(container.getMemory()).setVirtualCores(container.getNumberOfCores())
             .setRackName(container.getRackName()).build();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
index a363e6a..533454b 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
@@ -28,7 +28,7 @@ import 
org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
 import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
@@ -384,7 +384,7 @@ final class REEFScheduler implements Scheduler {
     this.executors.add(taskStatus.getTaskId().getValue(), 
resourceRequestProto.getMemorySize().get(),
         evaluatorControlHandler);
 
-    final ResourceAllocationEvent alloc = 
ResourceAllocationEventImpl.newBuilder()
+    final ResourceAllocationEvent alloc = 
ResourceEventImpl.newAllocationBuilder()
         .setIdentifier(taskStatus.getTaskId().getValue())
         .setNodeId(taskStatus.getSlaveId().getValue())
         .setResourceMemory(resourceRequestProto.getMemorySize().get())

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index 27e5662..0e594ac 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -33,7 +33,7 @@ import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
@@ -408,7 +408,7 @@ final class YarnContainerManager
 
         LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number 
= {1}",
             new Object[]{container.getResource().getMemory(), 
container.getResource().getVirtualCores()});
-        
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
+        
this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder()
             .setIdentifier(container.getId().toString())
             .setNodeId(container.getNodeId().toString())
             .setResourceMemory(container.getResource().getMemory())

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8efffd91/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
index fbf64f8..495a777 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -29,8 +29,10 @@ import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.annotations.audience.RuntimeAuthor;
 import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
 import org.apache.reef.driver.restart.EvaluatorRestartInfo;
+import org.apache.reef.driver.restart.RestartEvaluators;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
 import org.apache.reef.tang.annotations.Parameter;
@@ -56,6 +58,7 @@ public final class YarnDriverRuntimeRestartManager implements 
DriverRuntimeResta
   private final ApplicationMasterRegistration registration;
   private final REEFEventHandlers reefEventHandlers;
   private final YarnContainerManager yarnContainerManager;
+  private final RackNameFormatter rackNameFormatter;
 
   private Set<Container> previousContainers;
 
@@ -64,11 +67,13 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
                                           final EvaluatorPreserver 
evaluatorPreserver,
                                           final REEFEventHandlers 
reefEventHandlers,
                                           final ApplicationMasterRegistration 
registration,
-                                          final YarnContainerManager 
yarnContainerManager) {
+                                          final YarnContainerManager 
yarnContainerManager,
+                                          final RackNameFormatter 
rackNameFormatter) {
     this.registration = registration;
     this.evaluatorPreserver = evaluatorPreserver;
     this.reefEventHandlers = reefEventHandlers;
     this.yarnContainerManager = yarnContainerManager;
+    this.rackNameFormatter = rackNameFormatter;
     this.previousContainers = null;
   }
 
@@ -161,15 +166,16 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
   }
 
   /**
-   * Used by tDriverRestartManager. Gets the list of previous containers from 
the resource manager,
+   * Used by {@link org.apache.reef.driver.restart.DriverRestartManager}.
+   * Gets the list of previous containers from the resource manager,
    * compares that list to the YarnDriverRuntimeRestartManager's own list 
based on the evalutor preserver,
    * and determine which evaluators are alive and which have failed during 
restart.
-   * @return EvaluatorRestartInfo, the object encapsulating alive and failed 
evaluator IDs.
+   * @return a map of Evaluator ID to {@link EvaluatorRestartInfo} for 
evaluators that have either failed or survived
+   * driver restart.
    */
   @Override
-  public EvaluatorRestartInfo getAliveAndFailedEvaluators() {
-    final Set<String> recoveredEvaluators = new HashSet<>();
-    final Set<String> failedEvaluators = new HashSet<>();
+  public RestartEvaluators getPreviousEvaluators() {
+    final RestartEvaluators.Builder restartEvaluatorsBuilder = 
RestartEvaluators.newBuilder();
 
     this.initializeListOfPreviousContainers();
 
@@ -191,7 +197,8 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
           if (!previousContainersIds.contains(expectedContainerId)) {
             LOG.log(Level.WARNING, "Expected container [{0}] not alive, must 
have failed during driver restart.",
                 expectedContainerId);
-            failedEvaluators.add(expectedContainerId);
+            restartEvaluatorsBuilder.addRestartEvaluator(
+                
EvaluatorRestartInfo.createFailedEvaluatorInfo(expectedContainerId));
           }
         }
       }
@@ -208,11 +215,15 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
           throw new RuntimeException("Not expecting container " + 
container.getId().toString());
         }
 
-        recoveredEvaluators.add(container.getId().toString());
+        
restartEvaluatorsBuilder.addRestartEvaluator(EvaluatorRestartInfo.createExpectedEvaluatorInfo(
+            
ResourceEventImpl.newRecoveryBuilder().setIdentifier(container.getId().toString())
+                
.setNodeId(container.getNodeId().toString()).setRackName(rackNameFormatter.getRackName(container))
+                .setResourceMemory(container.getResource().getMemory())
+                
.setVirtualCores(container.getResource().getVirtualCores()).build()));
       }
     }
 
-    return new EvaluatorRestartInfo(recoveredEvaluators, failedEvaluators);
+    return restartEvaluatorsBuilder.build();
   }
 
   /**
@@ -232,7 +243,6 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
           .setState(ReefServiceProtos.State.FAILED)
           .setExitCode(1)
           .setDiagnostics("Container [" + evaluatorId + "] failed during 
driver restart process.")
-          .setIsFromPreviousDriver(true)
           .build());
     }
   }

Reply via email to