Repository: reef
Updated Branches:
  refs/heads/master da8ce0258 -> 77546f8f3


[REEF-943] Decouple the DriverSide heartbeat handling from protobuf

This patch:
 * Adds the 'pojos' package to
   `org.apache.reef.runtime.common.driver.evaluator`
 * Implements EvaluatorStatusPOJO, ContextStatusPOJO, TaskStatusPOJO,
   ContextMessagePOJO, TaskMessagePOJO.
   These classes turn ReefServiceProtos into corresponding POJOs. The
   POJO classes implement only the methods that DriverSide event
   handlers currently use; POJOs discard all other proto fields.
 * Implements ContextState and State enums that mimic corresponding
   enums from ReefServiceProtos. Note that the State enum represents
   both Tasks and Evaluator states in the original protobuf code. This
   patch adopts this policy.
 * Replaces ReefServiceProtos with corresponding POJOs and enums in all
   event handlers that EvaluatorManager#onEvaluatorHeartbeatMessage
   calls

JIRA:
 [REEF-943](https://issues.apache.org/jira/browse/REEF-943)

Pull Request:
  This closes #694


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/77546f8f
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/77546f8f
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/77546f8f

Branch: refs/heads/master
Commit: 77546f8f3c4fcd06efa7b8ad9597601a94532a99
Parents: da8ce02
Author: [email protected] <[email protected]>
Authored: Mon Nov 23 11:05:14 2015 +0100
Committer: Markus Weimer <[email protected]>
Committed: Tue Dec 15 10:02:06 2015 -0800

----------------------------------------------------------------------
 .../driver/context/ContextRepresenters.java     |  82 +++++++------
 .../common/driver/context/EvaluatorContext.java |  11 +-
 .../driver/evaluator/EvaluatorManager.java      |  71 ++++++-----
 .../evaluator/pojos/ContextMessagePOJO.java     |  55 +++++++++
 .../driver/evaluator/pojos/ContextState.java    |  34 ++++++
 .../evaluator/pojos/ContextStatusPOJO.java      | 122 +++++++++++++++++++
 .../evaluator/pojos/EvaluatorStatusPOJO.java    |  89 ++++++++++++++
 .../common/driver/evaluator/pojos/State.java    |  40 ++++++
 .../driver/evaluator/pojos/TaskMessagePOJO.java |  54 ++++++++
 .../driver/evaluator/pojos/TaskStatusPOJO.java  | 118 ++++++++++++++++++
 .../driver/evaluator/pojos/package-info.java    |  36 ++++++
 .../resourcemanager/ResourceManagerStatus.java  |  22 ++--
 .../resourcemanager/ResourceStatusEvent.java    |   4 +-
 .../ResourceStatusEventImpl.java                |  11 +-
 .../resourcemanager/RuntimeStatusEvent.java     |   3 +-
 .../resourcemanager/RuntimeStatusEventImpl.java |   9 +-
 .../common/driver/task/TaskRepresenter.java     |  88 ++++++-------
 .../runtime/local/driver/ResourceManager.java   |   4 +-
 .../process/ReefRunnableProcessObserver.java    |   8 +-
 .../runtime/mesos/driver/REEFScheduler.java     |   2 +-
 .../yarn/driver/YarnContainerManager.java       |  19 +--
 .../driver/YarnDriverRuntimeRestartManager.java |   4 +-
 22 files changed, 728 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index a01c073..5a117f9 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -25,8 +25,10 @@ import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.FailedContext;
 import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.driver.restart.EvaluatorRestartState;
-import org.apache.reef.proto.ReefServiceProtos;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import 
org.apache.reef.runtime.common.driver.evaluator.pojos.ContextMessagePOJO;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
 import org.apache.reef.util.Optional;
 
 import javax.inject.Inject;
@@ -97,14 +99,14 @@ public final class ContextRepresenters {
   /**
    * Process heartbeats from the contexts on an Evaluator.
    *
-   * @param contextStatusProtos
+   * @param contextStatusPOJOs
    * @param notifyClientOnNewActiveContext
    */
-  public synchronized void onContextStatusMessages(final 
Iterable<ReefServiceProtos.ContextStatusProto>
-                                                       contextStatusProtos,
+  public synchronized void onContextStatusMessages(final 
Iterable<ContextStatusPOJO>
+                                                       contextStatusPOJOs,
                                                    final boolean 
notifyClientOnNewActiveContext) {
-    for (final ReefServiceProtos.ContextStatusProto contextStatusProto : 
contextStatusProtos) {
-      this.onContextStatusMessage(contextStatusProto, 
notifyClientOnNewActiveContext);
+    for (final ContextStatusPOJO contextStatus : contextStatusPOJOs) {
+      this.onContextStatusMessage(contextStatus, 
notifyClientOnNewActiveContext);
     }
   }
 
@@ -112,53 +114,53 @@ public final class ContextRepresenters {
   /**
    * Process a heartbeat from a context.
    *
-   * @param contextStatusProto
+   * @param contextStatus
    * @param notifyClientOnNewActiveContext
    */
-  private synchronized void onContextStatusMessage(final 
ReefServiceProtos.ContextStatusProto contextStatusProto,
+  private synchronized void onContextStatusMessage(final ContextStatusPOJO 
contextStatus,
                                                    final boolean 
notifyClientOnNewActiveContext) {
 
-    LOG.log(Level.FINER, "Processing context status message for context {0}", 
contextStatusProto.getContextId());
-    switch (contextStatusProto.getContextState()) {
+    LOG.log(Level.FINER, "Processing context status message for context {0}", 
contextStatus.getContextId());
+    switch (contextStatus.getContextState()) {
     case READY:
-      this.onContextReady(contextStatusProto, notifyClientOnNewActiveContext);
+      this.onContextReady(contextStatus, notifyClientOnNewActiveContext);
       break;
     case FAIL:
-      this.onContextFailed(contextStatusProto);
+      this.onContextFailed(contextStatus);
       break;
     case DONE:
-      this.onContextDone(contextStatusProto);
+      this.onContextDone(contextStatus);
       break;
     default:
-      this.onUnknownContextStatus(contextStatusProto);
+      this.onUnknownContextStatus(contextStatus);
       break;
     }
-    LOG.log(Level.FINER, "Done processing context status message for context 
{0}", contextStatusProto.getContextId());
+    LOG.log(Level.FINER, "Done processing context status message for context 
{0}", contextStatus.getContextId());
 
   }
 
 
-  private synchronized void onUnknownContextStatus(final 
ReefServiceProtos.ContextStatusProto contextStatusProto) {
-    LOG.log(Level.WARNING, "Received unexpected context status: {0}", 
contextStatusProto);
-    throw new RuntimeException("Received unexpected context status: " + 
contextStatusProto.getContextState());
+  private synchronized void onUnknownContextStatus(final ContextStatusPOJO 
contextStatus) {
+    LOG.log(Level.WARNING, "Received unexpected context status: {0}", 
contextStatus);
+    throw new RuntimeException("Received unexpected context status: " + 
contextStatus.getContextState());
   }
 
-  private synchronized void onContextFailed(final 
ReefServiceProtos.ContextStatusProto contextStatusProto) {
-    assert ReefServiceProtos.ContextStatusProto.State.FAIL == 
contextStatusProto.getContextState();
-    final String contextID = contextStatusProto.getContextId();
+  private synchronized void onContextFailed(final ContextStatusPOJO 
contextStatus) {
+    assert ContextState.FAIL == contextStatus.getContextState();
+    final String contextID = contextStatus.getContextId();
     LOG.log(Level.FINE, "Context {0} failed", contextID);
     // It could have failed right away.
     if (this.isUnknownContextId(contextID)) {
-      this.onNewContext(contextStatusProto, false);
+      this.onNewContext(contextStatus, false);
     }
     final EvaluatorContext context = getContext(contextID);
     this.removeContext(context);
-    
this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatusProto));
+    
this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatus));
   }
 
-  private synchronized void onContextDone(final 
ReefServiceProtos.ContextStatusProto contextStatusProto) {
-    assert ReefServiceProtos.ContextStatusProto.State.DONE == 
contextStatusProto.getContextState();
-    final String contextID = contextStatusProto.getContextId();
+  private synchronized void onContextDone(final ContextStatusPOJO 
contextStatus) {
+    assert ContextState.DONE == contextStatus.getContextState();
+    final String contextID = contextStatus.getContextId();
     if (isUnknownContextId(contextID)) {
       throw new RuntimeException("Received DONE for context " + contextID + " 
which is unknown.");
     } else {
@@ -178,24 +180,24 @@ public final class ContextRepresenters {
   /**
    * Process a message with status READY from a context.
    *
-   * @param contextStatusProto
+   * @param contextStatus
    * @param notifyClientOnNewActiveContext whether or not to inform the 
application when this in fact refers to a new
    *                                       context.
    */
-  private synchronized void onContextReady(final 
ReefServiceProtos.ContextStatusProto contextStatusProto,
+  private synchronized void onContextReady(final ContextStatusPOJO 
contextStatus,
                                            final boolean 
notifyClientOnNewActiveContext) {
-    assert ReefServiceProtos.ContextStatusProto.State.READY == 
contextStatusProto.getContextState();
-    final String contextID = contextStatusProto.getContextId();
+    assert ContextState.READY == contextStatus.getContextState();
+    final String contextID = contextStatus.getContextId();
     // This could be the first message we get from that context
     if (this.isUnknownContextId(contextID)) {
-      this.onNewContext(contextStatusProto, notifyClientOnNewActiveContext);
+      this.onNewContext(contextStatus, notifyClientOnNewActiveContext);
     }
 
     // Dispatch the messages to the application, if there are any.
-    for (final ReefServiceProtos.ContextStatusProto.ContextMessageProto
-             contextMessageProto : contextStatusProto.getContextMessageList()) 
{
-      final byte[] theMessage = contextMessageProto.getMessage().toByteArray();
-      final String sourceID = contextMessageProto.getSourceId();
+    for (final ContextMessagePOJO
+             contextMessage : contextStatus.getContextMessageList()) {
+      final byte[] theMessage = contextMessage.getMessage();
+      final String sourceID = contextMessage.getSourceId();
       this.messageDispatcher.onContextMessage(new 
ContextMessageImpl(theMessage, contextID, sourceID));
     }
 
@@ -204,16 +206,16 @@ public final class ContextRepresenters {
   /**
    * Create and add a new context representer.
    *
-   * @param contextStatusProto             the message to create the context 
from
+   * @param contextStatus             the message to create the context from
    * @param notifyClientOnNewActiveContext whether or not to fire an event to 
the user.
    */
-  private synchronized void onNewContext(final 
ReefServiceProtos.ContextStatusProto contextStatusProto,
+  private synchronized void onNewContext(final ContextStatusPOJO contextStatus,
                                          final boolean 
notifyClientOnNewActiveContext) {
-    final String contextID = contextStatusProto.getContextId();
+    final String contextID = contextStatus.getContextId();
     LOG.log(Level.FINE, "Adding new context {0}.", contextID);
 
-    final Optional<String> parentID = contextStatusProto.hasParentId() ?
-        Optional.of(contextStatusProto.getParentId()) : 
Optional.<String>empty();
+    final Optional<String> parentID = contextStatus.hasParentId() ?
+        Optional.of(contextStatus.getParentId()) : Optional.<String>empty();
     final EvaluatorContext context = contextFactory.newContext(contextID, 
parentID);
     this.addContext(context);
     if (notifyClientOnNewActiveContext) {

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
index a25e116..8106713 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
@@ -26,8 +26,9 @@ import org.apache.reef.driver.context.ClosedContext;
 import org.apache.reef.driver.context.FailedContext;
 import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
 import org.apache.reef.proto.EvaluatorRuntimeProtocol;
-import org.apache.reef.proto.ReefServiceProtos;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
 import org.apache.reef.runtime.common.utils.ExceptionCodec;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
@@ -248,15 +249,15 @@ public final class EvaluatorContext implements 
ActiveContext {
   }
 
   public synchronized FailedContext getFailedContext(
-      final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+      final ContextStatusPOJO contextStatus) {
 
-    assert ReefServiceProtos.ContextStatusProto.State.FAIL == 
contextStatusProto.getContextState();
+    assert ContextState.FAIL == contextStatus.getContextState();
 
     final String id = this.getId();
     final Optional<String> description = Optional.empty();
 
-    final Optional<byte[]> data = contextStatusProto.hasError() ?
-        Optional.of(contextStatusProto.getError().toByteArray()) :
+    final Optional<byte[]> data = contextStatus.hasError() ?
+        Optional.of(contextStatus.getError()) :
         Optional.<byte[]>empty();
 
     final Optional<Throwable> cause = data.isPresent() ?

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 7a92dbc..cc107c3 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -24,6 +24,10 @@ import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
 import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.driver.restart.EvaluatorRestartState;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
+import 
org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO;
 import org.apache.reef.tang.ConfigurationProvider;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.context.FailedContext;
@@ -60,6 +64,7 @@ import org.apache.reef.wake.time.event.Alarm;
 
 import javax.inject.Inject;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.logging.Level;
@@ -190,9 +195,9 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   }
 
   private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent 
resourceStatusEvent) {
-    return resourceStatusEvent.getState() == ReefServiceProtos.State.DONE ||
-        resourceStatusEvent.getState() == ReefServiceProtos.State.FAILED ||
-        resourceStatusEvent.getState() == ReefServiceProtos.State.KILLED;
+    return resourceStatusEvent.getState() == State.DONE ||
+        resourceStatusEvent.getState() == State.FAILED ||
+        resourceStatusEvent.getState() == State.KILLED;
   }
 
   @Override
@@ -313,7 +318,6 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
         } else {
           this.messageDispatcher.onEvaluatorFailed(failedEvaluator);
         }
-
       } catch (final Exception e) {
         LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
       } finally {
@@ -372,17 +376,25 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
       // Process the Evaluator status message
       if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
-        
this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus());
+        EvaluatorStatusPOJO evaluatorStatus = new 
EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus());
+        this.onEvaluatorStatusMessage(evaluatorStatus);
       }
 
       // Process the Context status message(s)
       final boolean informClientOfNewContexts = 
!evaluatorHeartbeatProto.hasTaskStatus();
-      
this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(),
+      final List<ContextStatusPOJO> contextStatusList = new ArrayList<>();
+      for (ReefServiceProtos.ContextStatusProto proto : 
evaluatorHeartbeatProto.getContextStatusList()) {
+        contextStatusList.add(new ContextStatusPOJO(proto));
+      }
+
+      this.contextRepresenters.onContextStatusMessages(contextStatusList,
           informClientOfNewContexts);
 
       // Process the Task status message
+
       if (evaluatorHeartbeatProto.hasTaskStatus()) {
-        this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus());
+        TaskStatusPOJO taskStatus = new 
TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus());
+        this.onTaskStatusMessage(taskStatus);
       }
       LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", 
this.getId());
     }
@@ -393,7 +405,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    *
    * @param message
    */
-  private synchronized void onEvaluatorStatusMessage(final 
ReefServiceProtos.EvaluatorStatusProto message) {
+  private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO 
message) {
 
     switch (message.getState()) {
     case DONE:
@@ -417,8 +429,8 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    *
    * @param message
    */
-  private synchronized void onEvaluatorDone(final 
ReefServiceProtos.EvaluatorStatusProto message) {
-    assert message.getState() == ReefServiceProtos.State.DONE;
+  private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) 
{
+    assert message.getState() == State.DONE;
     LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
     this.stateManager.setDone();
     this.messageDispatcher.onEvaluatorCompleted(new 
CompletedEvaluatorImpl(this.evaluatorId));
@@ -428,14 +440,15 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   /**
    * Process an evaluator message that indicates a crash.
    *
-   * @param evaluatorStatusProto
+   * @param evaluatorStatus
    */
-  private synchronized void onEvaluatorFailed(final 
ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) {
-    assert evaluatorStatusProto.getState() == ReefServiceProtos.State.FAILED;
+  private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO 
evaluatorStatus) {
+    assert evaluatorStatus.getState()
+            == State.FAILED;
     final EvaluatorException evaluatorException;
-    if (evaluatorStatusProto.hasError()) {
+    if (evaluatorStatus.hasError()) {
       final Optional<Throwable> exception =
-          
this.exceptionCodec.fromBytes(evaluatorStatusProto.getError().toByteArray());
+          this.exceptionCodec.fromBytes(evaluatorStatus.getError());
       if (exception.isPresent()) {
         evaluatorException = new EvaluatorException(getId(), exception.get());
       } else {
@@ -486,40 +499,40 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   /**
    * Handle task status messages.
    *
-   * @param taskStatusProto message contains the current task status.
+   * @param taskStatus message contains the current task status.
    */
-  private void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
+  private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
 
-    if (!(this.task.isPresent() && 
this.task.get().getId().equals(taskStatusProto.getTaskId()))) {
-      if (taskStatusProto.getState() == ReefServiceProtos.State.INIT ||
-          taskStatusProto.getState() == ReefServiceProtos.State.FAILED ||
-          taskStatusProto.getState() == ReefServiceProtos.State.RUNNING ||
+    if (!(this.task.isPresent() && 
this.task.get().getId().equals(taskStatus.getTaskId()))) {
+      if (taskStatus.getState() == State.INIT ||
+          taskStatus.getState() == State.FAILED ||
+          taskStatus.getState() == State.RUNNING ||
           driverRestartManager.getEvaluatorRestartState(evaluatorId) == 
EvaluatorRestartState.REREGISTERED) {
 
         // [REEF-308] exposes a bug where the .NET evaluator does not send its 
states in the right order
         // [REEF-289] is a related item which may fix the issue
-        if (taskStatusProto.getState() == ReefServiceProtos.State.RUNNING) {
+        if (taskStatus.getState() == State.RUNNING) {
           LOG.log(Level.WARNING,
                   "Received a message of state " + 
ReefServiceProtos.State.RUNNING +
-                  " for Task " + taskStatusProto.getTaskId() +
+                  " for Task " + taskStatus.getTaskId() +
                   " before receiving its " + ReefServiceProtos.State.INIT + " 
state");
         }
 
         // FAILED is a legal first state of a Task as it could have failed 
during construction.
         this.task = Optional.of(
-            new TaskRepresenter(taskStatusProto.getTaskId(),
-                
this.contextRepresenters.getContext(taskStatusProto.getContextId()),
+            new TaskRepresenter(taskStatus.getTaskId(),
+                this.contextRepresenters.getContext(taskStatus.getContextId()),
                 this.messageDispatcher,
                 this,
                 this.exceptionCodec,
                 this.driverRestartManager));
       } else {
-        throw new RuntimeException("Received a message of state " + 
taskStatusProto.getState() +
-            ", not INIT, RUNNING, or FAILED for Task " + 
taskStatusProto.getTaskId() +
+        throw new RuntimeException("Received a message of state " + 
taskStatus.getState() +
+            ", not INIT, RUNNING, or FAILED for Task " + 
taskStatus.getTaskId() +
             " which we haven't heard from before.");
       }
     }
-    this.task.get().onTaskStatusMessage(taskStatusProto);
+    this.task.get().onTaskStatusMessage(taskStatus);
 
     if (this.task.get().isNotRunning()) {
       LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
@@ -563,7 +576,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
               .append("] was running when the Evaluator crashed.");
         }
 
-        if (resourceStatusEvent.getState() == ReefServiceProtos.State.KILLED) {
+        if (resourceStatusEvent.getState() == State.KILLED) {
           this.onEvaluatorException(new 
EvaluatorKilledByResourceManagerException(this.evaluatorId,
               messageBuilder.toString()));
         } else {

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
new file mode 100644
index 0000000..3760719
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.runtime.common.driver.evaluator.pojos;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+
+/**
+ *  DriverSide representation of ContextMessageProto.
+ */
+@DriverSide
+@Private
+public final class ContextMessagePOJO {
+
+  private final byte[] message;
+  private final String sourceId;
+
+
+  ContextMessagePOJO(final 
ReefServiceProtos.ContextStatusProto.ContextMessageProto proto){
+    message = proto.getMessage().toByteArray();
+    sourceId = proto.getSourceId();
+  }
+
+  /**
+   * @return the serialized message from a context.
+   */
+  public byte[] getMessage(){
+    return message;
+  }
+
+  /**
+   * @return the ID of the ContextMessageSource that sent the message.
+   */
+  public String getSourceId(){
+    return sourceId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextState.java
new file mode 100644
index 0000000..31efc62
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.runtime.common.driver.evaluator.pojos;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * DriverSide representation of ContextStatusProto.State.
+ */
+@DriverSide
+@Private
+public enum ContextState {
+    READY,
+    DONE,
+    FAIL;
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
new file mode 100644
index 0000000..a636ea5
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.runtime.common.driver.evaluator.pojos;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+import 
org.apache.reef.proto.ReefServiceProtos.ContextStatusProto.ContextMessageProto;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * DriverSide representation of ContextStatusProto.
+ */
+@DriverSide
+@Private
+public final class ContextStatusPOJO {
+
+  private final String contextId;
+  private final String parentId;
+  private final byte[] errorBytes;
+  private final ContextState contextState;
+  private final List<ContextMessagePOJO> contextMessages = new ArrayList<>();
+
+
+  public ContextStatusPOJO(final ReefServiceProtos.ContextStatusProto proto){
+
+    contextId = proto.getContextId();
+    parentId = proto.hasParentId() ? proto.getParentId() : null;
+    errorBytes = proto.hasError() ? proto.getError().toByteArray() : null;
+    contextState = proto.hasContextState()? 
getContextStateFromProto(proto.getContextState()) : null;
+
+    for (final ContextMessageProto contextMessageProto : 
proto.getContextMessageList()) {
+      contextMessages.add(new ContextMessagePOJO(contextMessageProto));
+    }
+
+  }
+
+  /**
+   * @return a list of messages sent by a context
+   */
+  public List<ContextMessagePOJO> getContextMessageList() {
+    return contextMessages;
+  }
+
+  /**
+   * @return the ID of a context
+   */
+  public String getContextId() {
+    return contextId;
+  }
+
+  /**
+   * @return the {@link 
org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState} of a context
+   */
+  public ContextState getContextState() {
+    return contextState;
+  }
+
+  /**
+   * @return true, if a context has thrown an exception and sent it to a driver
+   */
+  public boolean hasError() {
+    return null != errorBytes;
+  }
+
+  /**
+   * @return serialized exception thrown by a context
+   */
+  public byte[] getError() {
+    return  errorBytes;
+  }
+
+  /**
+   * @return true, if a context has the parent context
+   */
+  public boolean hasParentId() {
+    return null != parentId;
+  }
+
+  /**
+   * @return the id of the parent context
+   */
+  public String getParentId(){
+    return parentId;
+  }
+
+  private ContextState getContextStateFromProto(
+          final 
org.apache.reef.proto.ReefServiceProtos.ContextStatusProto.State protoState) {
+
+    switch (protoState) {
+    case READY:
+      return ContextState.READY;
+    case DONE:
+      return ContextState.DONE;
+    case FAIL:
+      return ContextState.FAIL;
+    default:
+      throw new IllegalStateException("Unknown state " + protoState + " in 
ContextStatusProto");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
new file mode 100644
index 0000000..661369c
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.runtime.common.driver.evaluator.pojos;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+
+/**
+ * DriverSide representation of EvaluatorStatusProto.
+ */
+@DriverSide
+@Private
+public final class EvaluatorStatusPOJO {
+
+  private final String evaluatorID;
+  private final byte[] evaluatorIdBytes;
+  private final State evaluatorState;
+  private final byte[] errorBytes;
+
+
+  public EvaluatorStatusPOJO(final ReefServiceProtos.EvaluatorStatusProto 
proto) {
+
+    evaluatorID = proto.getEvaluatorId();
+    evaluatorIdBytes = proto.getEvaluatorIdBytes().toByteArray();
+    evaluatorState = proto.hasState()? getStateFromProto(proto.getState()) : 
null;
+    errorBytes = proto.hasError() ? proto.getError().toByteArray() : null;
+
+  }
+
+  /**
+   * @return true, if an evaluator has thrown an exception and sent it to a 
driver
+   */
+  public boolean hasError() {
+    return null != errorBytes;
+  }
+
+  /**
+   * @return serialized exception thrown by an evaluator
+   */
+  public byte[] getError(){
+    return errorBytes;
+  }
+
+  /**
+   * @return current {@link 
org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task
+   */
+  public State getState(){
+    return evaluatorState;
+  }
+
+  private State getStateFromProto(final 
org.apache.reef.proto.ReefServiceProtos.State protoState) {
+
+    switch (protoState) {
+    case INIT:
+      return State.INIT;
+    case RUNNING:
+      return State.RUNNING;
+    case DONE:
+      return State.DONE;
+    case SUSPEND:
+      return State.SUSPEND;
+    case FAILED:
+      return State.FAILED;
+    case KILLED:
+      return State.KILLED;
+    default:
+      throw new IllegalStateException("Unknown state " + protoState + " in 
EvaluatorStatusProto");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
new file mode 100644
index 0000000..477564c
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.runtime.common.driver.evaluator.pojos;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * DriverSide representation of ReefServiceProtos.State.
+ */
+
+@DriverSide
+@Private
+public enum State {
+
+    INIT,
+    RUNNING,
+    DONE,
+    SUSPEND,
+    FAILED,
+    KILLED;
+
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
new file mode 100644
index 0000000..84bb4d6
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.runtime.common.driver.evaluator.pojos;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+
+/**
+ *  DriverSide representation of TaskMessageProto.
+ */
+@DriverSide
+@Private
+public final class TaskMessagePOJO {
+
+  private final byte[] message;
+  private final String sourceId;
+
+  TaskMessagePOJO(final ReefServiceProtos.TaskStatusProto.TaskMessageProto 
proto){
+    message = proto.getMessage().toByteArray();
+    sourceId = proto.getSourceId();
+  }
+
+  /**
+   * @return the serialized message from a task
+   */
+  public byte[] getMessage() {
+    return message;
+  }
+
+  /**
+   * @return the ID of the TaskMessageSource that sent the message.
+   */
+  public String getSourceId(){
+    return sourceId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
new file mode 100644
index 0000000..66ecdda
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.runtime.common.driver.evaluator.pojos;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
+import 
org.apache.reef.proto.ReefServiceProtos.TaskStatusProto.TaskMessageProto;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * DriverSide representation of TaskStatusProto.
+ */
+@DriverSide
+@Private
+public final class TaskStatusPOJO {
+
+  private final String taskId;
+  private final String contextId;
+  private final State state;
+  private final byte[] result;
+  private final List<TaskMessagePOJO> taskMessages = new ArrayList<>();
+
+  public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto){
+
+    taskId = proto.getTaskId();
+    contextId = proto.getContextId();
+    state = proto.hasState()? getStateFromProto(proto.getState()) : null;
+    result = proto.hasResult() ? proto.getResult().toByteArray() : null;
+
+    for (final TaskMessageProto taskMessageProto : proto.getTaskMessageList()) 
{
+      taskMessages.add(new TaskMessagePOJO(taskMessageProto));
+    }
+
+  }
+
+  /**
+   * @return a list of messages sent by a task
+   */
+  public List<TaskMessagePOJO> getTaskMessageList(){
+    return taskMessages;
+  }
+
+  /**
+   * @return true, if a completed task returned a non-null value in the 
'return' statement
+   */
+  public boolean hasResult(){
+    return null != result;
+  }
+
+  /**
+   * @return serialized result that a completed task returned to the Driver
+   */
+  public byte[] getResult(){
+    return result;
+  }
+
+  /**
+   * @return the id of a task
+   */
+  public String getTaskId(){
+    return taskId;
+  }
+
+  /**
+   * @return the id of a context that this task runs within
+   */
+  public String getContextId(){
+    return contextId;
+  }
+
+  /**
+   * @return current {@link 
org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task
+   */
+  public State getState(){
+    return state;
+  }
+
+  private State getStateFromProto(final 
org.apache.reef.proto.ReefServiceProtos.State protoState) {
+
+    switch (protoState) {
+    case INIT:
+      return State.INIT;
+    case RUNNING:
+      return State.RUNNING;
+    case DONE:
+      return State.DONE;
+    case SUSPEND:
+      return State.SUSPEND;
+    case FAILED:
+      return State.FAILED;
+    case KILLED:
+      return State.KILLED;
+    default:
+      throw new IllegalStateException("Unknown state " + protoState + " in 
EvaluatorStatusProto");
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/package-info.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/package-info.java
new file mode 100644
index 0000000..d3d1369
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/package-info.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * POJOs in this package encapsulate the content of
+ * an EvaluatorHeartbeatProto on the DriverSide.
+ *
+ * POJOs keep only fields and interfaces that DriverSide actually uses;
+ * other EvaluatorHeartbeatProto members are discarded.
+ *
+ * See REEF-943.
+ */
+
+@DriverSide
+@Private
+package org.apache.reef.runtime.common.driver.evaluator.pojos;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
index 68d7594..273649f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -20,8 +20,8 @@ package org.apache.reef.runtime.common.driver.resourcemanager;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import org.apache.reef.runtime.common.driver.idle.DriverIdleManager;
 import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
 import org.apache.reef.runtime.common.driver.idle.IdleMessage;
@@ -50,7 +50,7 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
   private final InjectionFuture<DriverIdleManager> driverIdleManager;
 
   // Mutable state.
-  private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
+  private State state =  State.INIT;
   private int outstandingContainerRequests = 0;
   private int containerAllocationCount = 0;
 
@@ -65,7 +65,7 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
 
   @Override
   public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) 
{
-    final ReefServiceProtos.State newState = runtimeStatusEvent.getState();
+    final State newState = runtimeStatusEvent.getState();
     LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent);
     this.outstandingContainerRequests = 
runtimeStatusEvent.getOutstandingContainerRequests().orElse(0);
     this.containerAllocationCount = 
runtimeStatusEvent.getContainerAllocationList().size();
@@ -94,7 +94,7 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
    * Change the state of the Resource Manager to be RUNNING.
    */
   public synchronized void setRunning() {
-    this.setState(ReefServiceProtos.State.RUNNING);
+    this.setState(State.RUNNING);
   }
 
   /**
@@ -117,18 +117,18 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
 
 
   private synchronized void onRMFailure(final RuntimeStatusEvent 
runtimeStatusEvent) {
-    assert runtimeStatusEvent.getState() == ReefServiceProtos.State.FAILED;
+    assert runtimeStatusEvent.getState() == State.FAILED;
     
this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get());
   }
 
   private synchronized void onRMDone(final RuntimeStatusEvent 
runtimeStatusEvent) {
-    assert runtimeStatusEvent.getState() == ReefServiceProtos.State.DONE;
+    assert runtimeStatusEvent.getState() == State.DONE;
     LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver 
shutdown.");
     this.driverStatusManager.onComplete();
   }
 
   private synchronized void onRMRunning(final RuntimeStatusEvent 
runtimeStatusEvent) {
-    assert runtimeStatusEvent.getState() == ReefServiceProtos.State.RUNNING;
+    assert runtimeStatusEvent.getState() == State.RUNNING;
     if (this.isIdle()) {
       this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
     }
@@ -141,7 +141,7 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
   }
 
   private synchronized boolean isRunning() {
-    return ReefServiceProtos.State.RUNNING.equals(this.state);
+    return State.RUNNING.equals(this.state);
   }
 
   /**
@@ -155,8 +155,8 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
   * @return true if the transition is legal; false otherwise
   *
   */
-  private synchronized boolean isLegalStateTransition(final 
ReefServiceProtos.State from,
-                                                      final 
ReefServiceProtos.State to) {
+  private synchronized boolean isLegalStateTransition(final State from,
+                                                      final State to) {
 
     // handle diagonal elements of the transition matrix
     if (from.equals(to)){
@@ -212,7 +212,7 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
 
   }
 
-  private synchronized void setState(final ReefServiceProtos.State newState) {
+  private synchronized void setState(final State newState) {
 
     if (isLegalStateTransition(this.state, newState)) {
       this.state = newState;

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
index c669d95..82945e6 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.common.driver.resourcemanager;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.RuntimeAuthor;
-import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.util.Optional;
 
@@ -45,7 +45,7 @@ public interface ResourceStatusEvent {
   /**
    * @return State of the resource
    */
-  ReefServiceProtos.State getState();
+  State getState();
 
   /**
    * @return Diagnostics from the resource

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
index ee01879..4a9da3d 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.reef.runtime.common.driver.resourcemanager;
 
-import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import org.apache.reef.util.BuilderUtils;
 import org.apache.reef.util.Optional;
 
@@ -28,7 +28,7 @@ import org.apache.reef.util.Optional;
  */
 public final class ResourceStatusEventImpl implements ResourceStatusEvent {
   private final String identifier;
-  private final ReefServiceProtos.State state;
+  private final State state;
   private final Optional<String> diagnostics;
   private final Optional<Integer> exitCode;
   private final String runtimeName;
@@ -52,7 +52,7 @@ public final class ResourceStatusEventImpl implements 
ResourceStatusEvent {
   }
 
   @Override
-  public ReefServiceProtos.State getState() {
+  public State getState() {
     return state;
   }
 
@@ -74,9 +74,10 @@ public final class ResourceStatusEventImpl implements 
ResourceStatusEvent {
    * Builder used to create ResourceStatusEvent instances.
    */
   public static final class Builder implements 
org.apache.reef.util.Builder<ResourceStatusEvent> {
+
     private String identifier;
     private String runtimeName;
-    private ReefServiceProtos.State state;
+    private State state;
     private String diagnostics;
     private Integer exitCode;
 
@@ -98,7 +99,7 @@ public final class ResourceStatusEventImpl implements 
ResourceStatusEvent {
     /**
      * @see ResourceStatusEvent#getState()
      */
-    public Builder setState(final ReefServiceProtos.State state) {
+    public Builder setState(final State state) {
       this.state = state;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
index 18bcce0..ac02dba 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.driver.resourcemanager;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.RuntimeAuthor;
 import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.util.Optional;
 
@@ -42,7 +43,7 @@ public interface RuntimeStatusEvent {
   /**
    * @return State of the Runtime
    */
-  ReefServiceProtos.State getState();
+  State getState();
 
   /**
    * @return List of allocated containers

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
index 50af802..48c9ed3 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
@@ -19,6 +19,7 @@
 package org.apache.reef.runtime.common.driver.resourcemanager;
 
 import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import org.apache.reef.util.BuilderUtils;
 import org.apache.reef.util.Optional;
 
@@ -31,7 +32,7 @@ import java.util.List;
  */
 public final class RuntimeStatusEventImpl implements RuntimeStatusEvent {
   private final String name;
-  private final ReefServiceProtos.State state;
+  private final State state;
   private final List<String> containerAllocationList;
   private final Optional<ReefServiceProtos.RuntimeErrorProto> error;
   private final Optional<Integer> outstandingContainerRequests;
@@ -50,7 +51,7 @@ public final class RuntimeStatusEventImpl implements 
RuntimeStatusEvent {
   }
 
   @Override
-  public ReefServiceProtos.State getState() {
+  public State getState() {
     return state;
   }
 
@@ -78,7 +79,7 @@ public final class RuntimeStatusEventImpl implements 
RuntimeStatusEvent {
    */
   public static final class Builder implements 
org.apache.reef.util.Builder<RuntimeStatusEvent> {
     private String name;
-    private ReefServiceProtos.State state;
+    private State state;
     private List<String> containerAllocationList = new ArrayList<>();
     private ReefServiceProtos.RuntimeErrorProto error;
     private Integer outstandingContainerRequests;
@@ -94,7 +95,7 @@ public final class RuntimeStatusEventImpl implements 
RuntimeStatusEvent {
     /**
      * @see RuntimeStatusEvent#getState()
      */
-    public Builder setState(final ReefServiceProtos.State state) {
+    public Builder setState(final State state) {
       this.state = state;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 281df11..89d60a3 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -25,10 +25,12 @@ import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.driver.restart.EvaluatorRestartState;
 import org.apache.reef.driver.task.FailedTask;
 import org.apache.reef.driver.task.RunningTask;
-import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
 import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskMessagePOJO;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO;
 import org.apache.reef.runtime.common.utils.ExceptionCodec;
 import org.apache.reef.util.Optional;
 
@@ -52,7 +54,7 @@ public final class TaskRepresenter {
   private final DriverRestartManager driverRestartManager;
 
   // Mutable state
-  private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
+  private State state = State.INIT;
 
   public TaskRepresenter(final String taskId,
                          final EvaluatorContext context,
@@ -68,57 +70,57 @@ public final class TaskRepresenter {
     this.driverRestartManager = driverRestartManager;
   }
 
-  private static byte[] getResult(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
-    return taskStatusProto.hasResult() ? 
taskStatusProto.getResult().toByteArray() : null;
+  private static byte[] getResult(final TaskStatusPOJO taskStatus) {
+    return taskStatus.hasResult() ? taskStatus.getResult() : null;
   }
 
-  public void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
+  public void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
 
     LOG.log(Level.FINE, "Received task {0} status {1}",
-        new Object[]{taskStatusProto.getTaskId(), taskStatusProto.getState()});
+            new Object[]{taskStatus.getTaskId(), taskStatus.getState()});
 
     // Make sure that the message is indeed for us.
-    if (!taskStatusProto.getContextId().equals(this.context.getId())) {
+    if (!taskStatus.getContextId().equals(this.context.getId())) {
       throw new RuntimeException(
-          "Received a message for a task running on Context " + 
taskStatusProto.getContextId() +
+          "Received a message for a task running on Context " + 
taskStatus.getContextId() +
               " while the Driver believes this Task to be run on Context " + 
this.context.getId());
     }
 
-    if (!taskStatusProto.getTaskId().equals(this.taskId)) {
-      throw new RuntimeException("Received a message for task " + 
taskStatusProto.getTaskId() +
+    if (!taskStatus.getTaskId().equals(this.taskId)) {
+      throw new RuntimeException("Received a message for task " + 
taskStatus.getTaskId() +
           " in the TaskRepresenter for Task " + this.taskId);
     }
 
     if 
(driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == 
EvaluatorRestartState.REREGISTERED) {
       // when a recovered heartbeat is received, we will take its word for it
       LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.",
-          new Object[]{taskStatusProto.getState(), this.taskId});
-      this.setState(taskStatusProto.getState());
+          new Object[]{taskStatus.getState(), this.taskId});
+      this.setState(taskStatus.getState());
     }
     // Dispatch the message to the right method.
-    switch (taskStatusProto.getState()) {
+    switch (taskStatus.getState()) {
     case INIT:
-      this.onTaskInit(taskStatusProto);
+      this.onTaskInit(taskStatus);
       break;
     case RUNNING:
-      this.onTaskRunning(taskStatusProto);
+      this.onTaskRunning(taskStatus);
       break;
     case SUSPEND:
-      this.onTaskSuspend(taskStatusProto);
+      this.onTaskSuspend(taskStatus);
       break;
     case DONE:
-      this.onTaskDone(taskStatusProto);
+      this.onTaskDone(taskStatus);
       break;
     case FAILED:
-      this.onTaskFailed(taskStatusProto);
+      this.onTaskFailed(taskStatus);
       break;
     default:
-      throw new IllegalStateException("Unknown task state: " + 
taskStatusProto.getState());
+      throw new IllegalStateException("Unknown task state: " + 
taskStatus.getState());
     }
   }
 
-  private void onTaskInit(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
-    assert ReefServiceProtos.State.INIT == taskStatusProto.getState();
+  private void onTaskInit(final TaskStatusPOJO taskStatusPOJO) {
+    assert State.INIT == taskStatusPOJO.getState();
     if (this.isKnown()) {
       LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" +
           " which we have seen before. Ignoring the second message", 
this.taskId);
@@ -126,12 +128,12 @@ public final class TaskRepresenter {
       final RunningTask runningTask = new RunningTaskImpl(
           this.evaluatorManager, this.taskId, this.context, this);
       this.messageDispatcher.onTaskRunning(runningTask);
-      this.setState(ReefServiceProtos.State.RUNNING);
+      this.setState(State.RUNNING);
     }
   }
 
-  private void onTaskRunning(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
-    assert taskStatusProto.getState() == ReefServiceProtos.State.RUNNING;
+  private void onTaskRunning(final TaskStatusPOJO taskStatus) {
+    assert taskStatus.getState() == State.RUNNING;
 
     if (this.isNotRunning()) {
       throw new IllegalStateException("Received a task status message from 
task " + this.taskId +
@@ -146,41 +148,41 @@ public final class TaskRepresenter {
       this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
     }
 
-    for (final ReefServiceProtos.TaskStatusProto.TaskMessageProto
-             taskMessageProto : taskStatusProto.getTaskMessageList()) {
+    for (final TaskMessagePOJO
+             taskMessageProto : taskStatus.getTaskMessageList()) {
       this.messageDispatcher.onTaskMessage(
-          new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(),
+          new TaskMessageImpl(taskMessageProto.getMessage(),
               this.taskId, this.context.getId(), 
taskMessageProto.getSourceId()));
     }
   }
 
-  private void onTaskSuspend(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
-    assert ReefServiceProtos.State.SUSPEND == taskStatusProto.getState();
+  private void onTaskSuspend(final TaskStatusPOJO taskStatus) {
+    assert State.SUSPEND == taskStatus.getState();
     assert this.isKnown();
     this.messageDispatcher.onTaskSuspended(
-        new SuspendedTaskImpl(this.context, getResult(taskStatusProto), 
this.taskId));
-    this.setState(ReefServiceProtos.State.SUSPEND);
+        new SuspendedTaskImpl(this.context, getResult(taskStatus), 
this.taskId));
+    this.setState(State.SUSPEND);
   }
 
-  private void onTaskDone(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
-    assert ReefServiceProtos.State.DONE == taskStatusProto.getState();
+  private void onTaskDone(final TaskStatusPOJO taskStatus) {
+    assert State.DONE == taskStatus.getState();
     assert this.isKnown();
     this.messageDispatcher.onTaskCompleted(
-        new CompletedTaskImpl(this.context, getResult(taskStatusProto), 
this.taskId));
-    this.setState(ReefServiceProtos.State.DONE);
+        new CompletedTaskImpl(this.context, getResult(taskStatus), 
this.taskId));
+    this.setState(State.DONE);
   }
 
-  private void onTaskFailed(final ReefServiceProtos.TaskStatusProto 
taskStatusProto) {
-    assert ReefServiceProtos.State.FAILED == taskStatusProto.getState();
+  private void onTaskFailed(final TaskStatusPOJO taskStatus) {
+    assert State.FAILED == taskStatus.getState();
     final Optional<ActiveContext> evaluatorContext = 
Optional.<ActiveContext>of(this.context);
-    final Optional<byte[]> bytes = 
Optional.ofNullable(getResult(taskStatusProto));
+    final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatus));
     final Optional<Throwable> exception = this.exceptionCodec.fromBytes(bytes);
     final String message = exception.isPresent() ? 
exception.get().getMessage() : "No message given";
     final Optional<String> description = Optional.empty();
     final FailedTask failedTask = new FailedTask(
         this.taskId, message, description, exception, bytes, evaluatorContext);
     this.messageDispatcher.onTaskFailed(failedTask);
-    this.setState(ReefServiceProtos.State.FAILED);
+    this.setState(State.FAILED);
   }
 
   public String getId() {
@@ -191,24 +193,24 @@ public final class TaskRepresenter {
    * @return true, if we had at least one message from the task.
    */
   private boolean isKnown() {
-    return this.state != ReefServiceProtos.State.INIT;
+    return this.state != State.INIT;
   }
 
   /**
    * @return true, if this task is in any other state but RUNNING.
    */
   public boolean isNotRunning() {
-    return this.state != ReefServiceProtos.State.RUNNING;
+    return this.state != State.RUNNING;
   }
 
   /**
    * @return true, if this task is in INIT or RUNNING status.
    */
   public boolean isClosable() {
-    return this.state == ReefServiceProtos.State.INIT || this.state == 
ReefServiceProtos.State.RUNNING;
+    return this.state == State.INIT || this.state == State.RUNNING;
   }
 
-  private void setState(final ReefServiceProtos.State newState) {
+  private void setState(final State newState) {
     LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]",
         new Object[]{this.taskId, this.state, newState});
     this.state = newState;

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
index 76d107a..fa3e91d 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
@@ -21,11 +21,11 @@ package org.apache.reef.runtime.local.driver;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.evaluator.EvaluatorProcess;
-import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
 import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
@@ -244,7 +244,7 @@ public final class ResourceManager {
     final RuntimeStatusEventImpl.Builder builder =
         RuntimeStatusEventImpl.newBuilder()
             .setName("LOCAL")
-            .setState(ReefServiceProtos.State.RUNNING)
+            .setState(State.RUNNING)
             
.setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests());
     for (final String containerAllocation : 
this.theContainers.getAllocatedContainerIDs()) {
       builder.addContainerAllocation(containerAllocation);

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
index 25fda94..8e2d72b 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
@@ -19,8 +19,8 @@
 package org.apache.reef.runtime.local.process;
 
 import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import org.apache.reef.runtime.local.driver.ResourceManager;
@@ -58,7 +58,7 @@ public final class ReefRunnableProcessObserver implements 
RunnableProcessObserve
     this.onResourceStatus(
         ResourceStatusEventImpl.newBuilder()
             .setIdentifier(processId)
-            .setState(ReefServiceProtos.State.RUNNING)
+            .setState(State.RUNNING)
             .build()
     );
   }
@@ -87,7 +87,7 @@ public final class ReefRunnableProcessObserver implements 
RunnableProcessObserve
     this.onResourceStatus(
         ResourceStatusEventImpl.newBuilder()
             .setIdentifier(processId)
-            .setState(ReefServiceProtos.State.DONE)
+            .setState(State.DONE)
             .setExitCode(0)
             .build()
     );
@@ -103,7 +103,7 @@ public final class ReefRunnableProcessObserver implements 
RunnableProcessObserve
     this.onResourceStatus(
         ResourceStatusEventImpl.newBuilder()
             .setIdentifier(processId)
-            .setState(ReefServiceProtos.State.FAILED)
+            .setState(State.FAILED)
             .setExitCode(exitCode)
             .build()
     );

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
index 9817f9b..7387d3c 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
@@ -21,10 +21,10 @@ package org.apache.reef.runtime.mesos.driver;
 import com.google.protobuf.ByteString;
 import org.apache.mesos.MesosSchedulerDriver;
 import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.proto.ReefServiceProtos.State;
 import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index fc03393..f38b53a 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.reef.driver.ProgressProvider;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
@@ -149,7 +150,7 @@ final class YarnContainerManager
   @Override
   public void onShutdownRequest() {
     this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder()
-        .setName(RUNTIME_NAME).setState(ReefServiceProtos.State.DONE).build());
+        .setName(RUNTIME_NAME).setState(State.DONE).build());
     this.driverStatusManager.onError(new Exception("Shutdown requested by 
YARN."));
   }
 
@@ -199,7 +200,7 @@ final class YarnContainerManager
     if (hasContainer) {
       final ResourceStatusEventImpl.Builder resourceStatusBuilder =
           
ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
-      resourceStatusBuilder.setState(ReefServiceProtos.State.DONE);
+      resourceStatusBuilder.setState(State.DONE);
       this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
     }
   }
@@ -339,7 +340,7 @@ final class YarnContainerManager
     final ResourceStatusEventImpl.Builder resourceStatusBuilder =
         
ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
 
-    resourceStatusBuilder.setState(ReefServiceProtos.State.FAILED);
+    resourceStatusBuilder.setState(State.FAILED);
     resourceStatusBuilder.setExitCode(1);
     resourceStatusBuilder.setDiagnostics(throwable.getMessage());
     this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
@@ -366,19 +367,19 @@ final class YarnContainerManager
         LOG.log(Level.FINE, "Container completed: status {0}", 
value.getExitStatus());
         switch (value.getExitStatus()) {
         case 0:
-          status.setState(ReefServiceProtos.State.DONE);
+          status.setState(State.DONE);
           break;
         case 143:
-          status.setState(ReefServiceProtos.State.KILLED);
+          status.setState(State.KILLED);
           break;
         default:
-          status.setState(ReefServiceProtos.State.FAILED);
+          status.setState(State.FAILED);
         }
         status.setExitCode(value.getExitStatus());
         break;
       default:
         LOG.info("Container running");
-        status.setState(ReefServiceProtos.State.RUNNING);
+        status.setState(State.RUNNING);
       }
 
       if (value.getDiagnostics() != null) {
@@ -506,7 +507,7 @@ final class YarnContainerManager
     final RuntimeStatusEventImpl.Builder builder =
         RuntimeStatusEventImpl.newBuilder()
             .setName(RUNTIME_NAME)
-            .setState(ReefServiceProtos.State.RUNNING)
+            .setState(State.RUNNING)
             
.setOutstandingContainerRequests(this.containerRequestCounter.get());
 
     for (final String allocatedContainerId : 
this.containers.getContainerIds()) {
@@ -530,7 +531,7 @@ final class YarnContainerManager
     }
 
     final RuntimeStatusEventImpl.Builder runtimeStatusBuilder = 
RuntimeStatusEventImpl.newBuilder()
-        .setState(ReefServiceProtos.State.FAILED)
+        .setState(State.FAILED)
         .setName(RUNTIME_NAME);
 
     final Encoder<Throwable> codec = new ObjectSerializableCodec<>();

http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
index d2a00f4..429e74a 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -30,8 +30,8 @@ import org.apache.reef.annotations.audience.RuntimeAuthor;
 import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
 import org.apache.reef.driver.restart.EvaluatorRestartInfo;
 import org.apache.reef.driver.restart.RestartEvaluators;
-import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
 import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
@@ -256,7 +256,7 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
       // trigger a failed evaluator event
       
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
           .setIdentifier(evaluatorId)
-          .setState(ReefServiceProtos.State.FAILED)
+          .setState(State.FAILED)
           .setExitCode(1)
           .setDiagnostics("Container [" + evaluatorId + "] failed during 
driver restart process.")
           .build());

Reply via email to