Repository: incubator-reef
Updated Branches:
  refs/heads/master 9b971979f -> ff336f33f


[REEF-617] Enable creation of EvaluatorManager on restarted evaluators

This addressed the issue by
  * Adding helper functions for ``EvaluatorManagerFactory`` to create
    EvaluatorManagers for recovered evaluators.
  * Recover ``EvaluatorManager` on first heartbeat of an expected
    Evaluator on restart.
  * Fix a bug in ``DriverRestartManager`` where the boolean check is
    reversed.

JIRA:
  [REEF-617](https://issues.apache.org/jira/browse/REEF-617)

Pull Request:
  This closes #422


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ff336f33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ff336f33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ff336f33

Branch: refs/heads/master
Commit: ff336f33f75d783deb34fad1512928c67131b7ed
Parents: 9b97197
Author: Andrew Chung <afchun...@gmail.com>
Authored: Wed Aug 26 15:32:52 2015 -0700
Committer: Markus Weimer <wei...@apache.org>
Committed: Thu Aug 27 11:21:00 2015 -0700

----------------------------------------------------------------------
 .../driver/restart/DriverRestartManager.java    | 20 ++++-
 .../driver/context/ContextRepresenters.java     | 16 +---
 .../evaluator/EvaluatorHeartbeatHandler.java    | 25 +++++-
 .../evaluator/EvaluatorManagerFactory.java      | 81 +++++++++++++-------
 .../common/driver/evaluator/Evaluators.java     |  2 +-
 .../resourcemanager/ResourceStatusHandler.java  |  5 +-
 .../common/driver/task/TaskRepresenter.java     |  3 +-
 .../driver/YarnDriverRuntimeRestartManager.java |  1 +
 8 files changed, 104 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 19f2b64..52764e4 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -27,6 +27,7 @@ import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.runtime.common.DriverRestartCompleted;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
 
 import javax.inject.Inject;
 import java.util.*;
@@ -122,6 +123,19 @@ public final class DriverRestartManager {
   }
 
   /**
+   * @return The ResourceRecoverEvent of the specified evaluator. Throws a 
{@link DriverFatalRuntimeException} if
+   * the evaluator does not exist in the set of known evaluators.
+   */
+  public synchronized ResourceRecoverEvent getResourceRecoverEvent(final 
String evaluatorId) {
+    if (!this.restartEvaluators.contains(evaluatorId)) {
+      throw new DriverFatalRuntimeException("Unexpected evaluator [" + 
evaluatorId + "], should " +
+          "not have been recorded.");
+    }
+
+    return this.restartEvaluators.get(evaluatorId).getResourceRecoverEvent();
+  }
+
+  /**
    * Indicate that this Driver has re-established the connection with one more 
Evaluator of a previous run.
    * @return true if the evaluator has been newly recovered.
    */
@@ -179,9 +193,9 @@ public final class DriverRestartManager {
   }
 
   /**
-   * Signals to the {@link DriverRestartManager} that an evaluator has had its 
running task processed.
+   * Signals to the {@link DriverRestartManager} that an evaluator has had its 
running task or active context processed.
    */
-  public synchronized void setEvaluatorRunningTask(final String evaluatorId) {
+  public synchronized void setEvaluatorProcessed(final String evaluatorId) {
     setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED);
   }
 
@@ -193,7 +207,7 @@ public final class DriverRestartManager {
   }
 
   private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final 
String evaluatorId) {
-    if (this.restartEvaluators.contains(evaluatorId)) {
+    if (!this.restartEvaluators.contains(evaluatorId)) {
       return EvaluatorRestartState.NOT_EXPECTED;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index 8abc19d..4936b9a 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -23,8 +23,6 @@ import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.restart.DriverRestartManager;
-import org.apache.reef.driver.restart.EvaluatorRestartState;
 import org.apache.reef.proto.ReefServiceProtos;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
 import org.apache.reef.util.Optional;
@@ -45,7 +43,6 @@ public final class ContextRepresenters {
 
   private final EvaluatorMessageDispatcher messageDispatcher;
   private final ContextFactory contextFactory;
-  private final DriverRestartManager driverRestartManager;
 
   // Mutable fields
   @GuardedBy("this")
@@ -55,11 +52,9 @@ public final class ContextRepresenters {
 
   @Inject
   private ContextRepresenters(final EvaluatorMessageDispatcher 
messageDispatcher,
-                              final ContextFactory contextFactory,
-                              final DriverRestartManager driverRestartManager) 
{
+                              final ContextFactory contextFactory) {
     this.messageDispatcher = messageDispatcher;
     this.contextFactory = contextFactory;
-    this.driverRestartManager = driverRestartManager;
   }
 
   /**
@@ -215,13 +210,8 @@ public final class ContextRepresenters {
         Optional.of(contextStatusProto.getParentId()) : 
Optional.<String>empty();
     final EvaluatorContext context = contextFactory.newContext(contextID, 
parentID);
     this.addContext(context);
-    if 
(driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId()) == 
EvaluatorRestartState.REREGISTERED) {
-      // when we get a recovered active context, always notify application
-      this.messageDispatcher.onDriverRestartContextActive(context);
-    } else {
-      if (notifyClientOnNewActiveContext) {
-        this.messageDispatcher.onContextActive(context);
-      }
+    if (notifyClientOnNewActiveContext) {
+      this.messageDispatcher.onContextActive(context);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
index ee1af9f..9011d7f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -76,8 +76,8 @@ public final class EvaluatorHeartbeatHandler
 
         if (this.driverRestartManager.onRecoverEvaluator(evaluatorId)) {
           LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has reported 
back to the driver after restart.");
-          // TODO[REEF-617]: Create EvaluatorManager, add to this.evaluators, 
and call onEvaluatorHeartbeatMessage().
 
+          evaluators.put(recoverEvaluatorManager(evaluatorId, 
evaluatorHeartbeatMessage));
         } else {
           LOG.log(Level.FINE, "Evaluator [" + evaluatorId + "] has already 
been recovered.");
         }
@@ -86,7 +86,10 @@ public final class EvaluatorHeartbeatHandler
 
       if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == 
EvaluatorRestartState.EXPIRED) {
         LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has 
reported back to the driver after restart.");
-        // TODO[REEF-617]: Create EvaluatorManager, call 
onEvaluatorHeartbeatMessage, and close it.
+
+        // Create the evaluator manager, analyze its heartbeat, but don't add 
it to the set of Evaluators.
+        // Immediately close it.
+        recoverEvaluatorManager(evaluatorId, 
evaluatorHeartbeatMessage).close();
         return;
       }
 
@@ -102,4 +105,22 @@ public final class EvaluatorHeartbeatHandler
       LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
     }
   }
+
+  /**
+   * Creates an EvaluatorManager for recovered evaluator.
+   * {@link EvaluatorManager#onEvaluatorHeartbeatMessage(RemoteMessage)} 
should not
+   * do anything if driver restart period has expired. Expired evaluators 
should be immediately closed
+   * upon return of this function, while evaluators that have not yet expired 
should be recorded and added
+   * to the {@link Evaluators} object.
+   */
+  private EvaluatorManager recoverEvaluatorManager(
+      final String evaluatorId,
+      final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> 
evaluatorHeartbeatMessage) {
+    final EvaluatorManager recoveredEvaluatorManager = evaluatorManagerFactory
+        .getNewEvaluatorManagerForRecoveredEvaluator(
+            driverRestartManager.getResourceRecoverEvent(evaluatorId));
+
+    
recoveredEvaluatorManager.onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
+    return recoveredEvaluatorManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
index fb496f5..4f49fe9 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
@@ -25,10 +25,7 @@ import org.apache.reef.driver.catalog.NodeDescriptor;
 import org.apache.reef.driver.catalog.ResourceCatalog;
 import org.apache.reef.driver.evaluator.EvaluatorProcessFactory;
 import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.*;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.exceptions.BindException;
 import org.apache.reef.tang.exceptions.InjectionException;
@@ -58,6 +55,34 @@ public final class EvaluatorManagerFactory {
     this.processFactory = processFactory;
   }
 
+  private EvaluatorManager getNewEvaluatorManagerInstanceForResource(
+      final ResourceEvent resourceEvent) {
+    NodeDescriptor nodeDescriptor = 
this.resourceCatalog.getNode(resourceEvent.getNodeId());
+
+    if (nodeDescriptor == null) {
+      final String nodeId = resourceEvent.getNodeId();
+      LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", 
nodeId);
+      final String[] hostNameAndPort = nodeId.split(":");
+      Validate.isTrue(hostNameAndPort.length == 2);
+      final NodeDescriptorEvent nodeDescriptorEvent = 
NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId)
+          
.setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1]))
+          .setMemorySize(resourceEvent.getResourceMemory())
+          .setRackName(resourceEvent.getRackName().get()).build();
+      // downcasting not to change the API
+      ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent);
+      nodeDescriptor = this.resourceCatalog.getNode(nodeId);
+    }
+    final EvaluatorDescriptorImpl evaluatorDescriptor = new 
EvaluatorDescriptorImpl(nodeDescriptor,
+        resourceEvent.getResourceMemory(), 
resourceEvent.getVirtualCores().get(),
+        processFactory.newEvaluatorProcess());
+
+    LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", 
resourceEvent.getIdentifier());
+    final EvaluatorManager evaluatorManager =
+        getNewEvaluatorManagerInstance(resourceEvent.getIdentifier(), 
evaluatorDescriptor);
+
+    return evaluatorManager;
+  }
+
   /**
    * Helper method to create a new EvaluatorManager instance.
    *
@@ -92,30 +117,9 @@ public final class EvaluatorManagerFactory {
    * @param resourceAllocationEvent
    * @return an EvaluatorManager for the newly allocated Evaluator.
    */
-  public EvaluatorManager getNewEvaluatorManagerForNewlyAllocatedEvaluator(
+  public EvaluatorManager getNewEvaluatorManagerForNewEvaluator(
       final ResourceAllocationEvent resourceAllocationEvent) {
-    NodeDescriptor nodeDescriptor = 
this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId());
-
-    if (nodeDescriptor == null) {
-      final String nodeId = resourceAllocationEvent.getNodeId();
-      LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", 
nodeId);
-      final String[] hostNameAndPort = nodeId.split(":");
-      Validate.isTrue(hostNameAndPort.length == 2);
-      final NodeDescriptorEvent nodeDescriptorEvent = 
NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId)
-          
.setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1]))
-          .setMemorySize(resourceAllocationEvent.getResourceMemory())
-          .setRackName(resourceAllocationEvent.getRackName().get()).build();
-      // downcasting not to change the API
-      ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent);
-      nodeDescriptor = this.resourceCatalog.getNode(nodeId);
-    }
-    final EvaluatorDescriptorImpl evaluatorDescriptor = new 
EvaluatorDescriptorImpl(nodeDescriptor,
-        resourceAllocationEvent.getResourceMemory(), 
resourceAllocationEvent.getVirtualCores().get(),
-        processFactory.newEvaluatorProcess());
-
-    LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", 
resourceAllocationEvent.getIdentifier());
-    final EvaluatorManager evaluatorManager =
-        
getNewEvaluatorManagerInstance(resourceAllocationEvent.getIdentifier(), 
evaluatorDescriptor);
+    final EvaluatorManager evaluatorManager = 
getNewEvaluatorManagerInstanceForResource(resourceAllocationEvent);
     evaluatorManager.fireEvaluatorAllocatedEvent();
 
     return evaluatorManager;
@@ -134,4 +138,27 @@ public final class EvaluatorManagerFactory {
     return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(),
         new EvaluatorDescriptorImpl(null, 128, 1, 
processFactory.newEvaluatorProcess()));
   }
+
+  /**
+   * Instantiates a new EvaluatorManager for a failed evaluator during driver 
restart.
+   * Does not fire an EvaluatorAllocatedEvent.
+   * @param resourceStatusEvent
+   * @return an EvaluatorManager for the user to call fail on.
+   */
+  public EvaluatorManager 
getNewEvaluatorManagerForEvaluatorFailedDuringDriverRestart(
+      final ResourceStatusEvent resourceStatusEvent) {
+    return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(),
+        new EvaluatorDescriptorImpl(null, 128, 1, 
processFactory.newEvaluatorProcess()));
+  }
+
+  /**
+   * Instantiates a new EvaluatorManager based on a resource allocation from a 
recovered evaluator.
+   *
+   * @param resourceRecoverEvent
+   * @return an EvaluatorManager for the newly allocated Evaluator.
+   */
+  public EvaluatorManager getNewEvaluatorManagerForRecoveredEvaluator(
+      final ResourceRecoverEvent resourceRecoverEvent) {
+    return getNewEvaluatorManagerInstanceForResource(resourceRecoverEvent);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index fe464d9..dd6e3ff 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -107,7 +107,7 @@ public final class Evaluators implements AutoCloseable {
   public synchronized void put(
       final EvaluatorManagerFactory evaluatorManagerFactory,
       final ResourceAllocationEvent evaluatorMsg) {
-    
this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewlyAllocatedEvaluator(evaluatorMsg));
+    
this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
index 70a3cce..3b0a75a 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
@@ -57,8 +57,9 @@ public final class ResourceStatusHandler implements 
EventHandler<ResourceStatusE
       evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent);
     } else {
       if (resourceStatusEvent.getIsFromPreviousDriver().get()) {
-        final EvaluatorManager previousEvaluatorManager =
-            
this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusEvent);
+        final EvaluatorManager previousEvaluatorManager = 
this.evaluatorManagerFactory
+            
.getNewEvaluatorManagerForEvaluatorFailedDuringDriverRestart(resourceStatusEvent);
+
         previousEvaluatorManager.onResourceStatusMessage(resourceStatusEvent);
       } else {
         throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index a09532b..0107878 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -88,6 +88,7 @@ public final class TaskRepresenter {
       throw new RuntimeException("Received a message for task " + 
taskStatusProto.getTaskId() +
           " in the TaskRepresenter for Task " + this.taskId);
     }
+
     if 
(driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == 
EvaluatorRestartState.REREGISTERED) {
       // when a recovered heartbeat is received, we will take its word for it
       LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.",
@@ -142,7 +143,7 @@ public final class TaskRepresenter {
     if 
(driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == 
EvaluatorRestartState.REREGISTERED) {
       final RunningTask runningTask = new RunningTaskImpl(
           this.evaluatorManager, this.taskId, this.context, this);
-      
this.driverRestartManager.setEvaluatorRunningTask(evaluatorManager.getId());
+      
this.driverRestartManager.setEvaluatorProcessed(evaluatorManager.getId());
       this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ff336f33/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
index 495a777..c9a1c34 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -243,6 +243,7 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
           .setState(ReefServiceProtos.State.FAILED)
           .setExitCode(1)
           .setDiagnostics("Container [" + evaluatorId + "] failed during 
driver restart process.")
+          .setIsFromPreviousDriver(true)
           .build());
     }
   }

Reply via email to