Repository: incubator-reef
Updated Branches:
  refs/heads/master 7396f314e -> 089be44d5


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
 
b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
index 658747d..8fb1872 100644
--- 
a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
+++ 
b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.reef.runtime.local.driver;
 
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -58,6 +58,6 @@ public class ResourceRequestQueueTest {
   }
 
   private ResourceRequest getAlmostSatisfied() {
-    return new 
ResourceRequest(DriverRuntimeProtocol.ResourceRequestProto.newBuilder().setResourceCount(1).setMemorySize(128).build());
+    return new 
ResourceRequest(ResourceRequestEventImpl.newBuilder().setResourceCount(1).setMemorySize(128).build());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
 
b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
index 3156a02..8eb310a 100644
--- 
a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
+++ 
b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.reef.runtime.local.driver;
 
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -56,6 +56,6 @@ public final class ResourceRequestTest {
   }
 
   private ResourceRequest get(final int n) {
-    return new 
ResourceRequest(DriverRuntimeProtocol.ResourceRequestProto.newBuilder().setResourceCount(n).setMemorySize(128).build());
+    return new 
ResourceRequest(ResourceRequestEventImpl.newBuilder().setResourceCount(n).setMemorySize(128).build());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
index 29e1c21..c267373 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -20,10 +20,10 @@ package org.apache.reef.runtime.mesos.client;
 
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.ClientRuntimeProtocol;
-import org.apache.reef.proto.ReefServiceProtos.FileResourceProto;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.FileResource;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
@@ -78,10 +78,10 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
   }
 
   @Override
-  public void onNext(final ClientRuntimeProtocol.JobSubmissionProto 
jobSubmissionProto) {
+  public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
     try {
       final File jobFolder = new File(new File(this.rootFolderName),
-          "/" + jobSubmissionProto.getIdentifier() + "-" + 
System.currentTimeMillis() + "/");
+          "/" + jobSubmissionEvent.getIdentifier() + "-" + 
System.currentTimeMillis() + "/");
 
       final File driverFolder = new File(jobFolder, DRIVER_FOLDER_NAME);
       driverFolder.mkdirs();
@@ -91,7 +91,7 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
 
       final File localFolder = new File(reefFolder, 
this.fileNames.getLocalFolderName());
       localFolder.mkdirs();
-      for (final FileResourceProto file : 
jobSubmissionProto.getLocalFileList()) {
+      for (final FileResource file : jobSubmissionEvent.getLocalFileSet()) {
         final Path src = new File(file.getPath()).toPath();
         final Path dst = new File(driverFolder, 
this.fileNames.getLocalFolderPath() + "/" + file.getName()).toPath();
         Files.copy(src, dst, 
java.nio.file.StandardCopyOption.REPLACE_EXISTING);
@@ -99,7 +99,7 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
 
       final File globalFolder = new File(reefFolder, 
this.fileNames.getGlobalFolderName());
       globalFolder.mkdirs();
-      for (final FileResourceProto file : 
jobSubmissionProto.getGlobalFileList()) {
+      for (final FileResource file : jobSubmissionEvent.getGlobalFileSet()) {
         final Path src = new File(file.getPath()).toPath();
         final Path dst = new File(driverFolder, 
this.fileNames.getGlobalFolderPath() + "/" + file.getName()).toPath();
         Files.copy(src, dst, 
java.nio.file.StandardCopyOption.REPLACE_EXISTING);
@@ -108,21 +108,21 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
       final Configuration driverConfiguration =
           Configurations.merge(MesosDriverConfiguration.CONF
               .set(MesosDriverConfiguration.MESOS_MASTER_IP, this.masterIp)
-              .set(MesosDriverConfiguration.JOB_IDENTIFIER, 
jobSubmissionProto.getIdentifier())
-              .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
jobSubmissionProto.getRemoteId())
+              .set(MesosDriverConfiguration.JOB_IDENTIFIER, 
jobSubmissionEvent.getIdentifier())
+              .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
jobSubmissionEvent.getRemoteId())
               .set(MesosDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
               .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1) // 
must be 1 as there is 1 scheduler at the same time
               .build(),
-          
this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
+          jobSubmissionEvent.getConfiguration());
       final File runtimeConfigurationFile = new File(driverFolder, 
this.fileNames.getDriverConfigurationPath());
       this.configurationSerializer.toFile(driverConfiguration, 
runtimeConfigurationFile);
 
       final List<String> launchCommand = new JavaLaunchCommandBuilder()
-          .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
-          .setLaunchID(jobSubmissionProto.getIdentifier())
+          .setErrorHandlerRID(jobSubmissionEvent.getRemoteId())
+          .setLaunchID(jobSubmissionEvent.getIdentifier())
           
.setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
           .setClassPath(this.classpath.getDriverClasspath())
-          .setMemory(jobSubmissionProto.getDriverMemory())
+          .setMemory(jobSubmissionEvent.getDriverMemory().get())
           .build();
 
       final File errFile = new File(driverFolder, 
fileNames.getDriverStderrFileName());

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
index 7ce98f8..b239b80 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
@@ -22,7 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.io.TempFileCreator;
 import org.apache.reef.io.WorkingDirectoryTempFileCreator;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.JobJarMaker;
@@ -78,15 +78,15 @@ final class MesosResourceLaunchHandler implements 
ResourceLaunchHandler {
 
 
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto 
resourceLaunchProto) {
+  public void onNext(final ResourceLaunchEvent resourceLaunchEvent) {
     try {
-      LOG.log(Level.INFO, "resourceLaunchProto. {0}", 
resourceLaunchProto.toString());
+      LOG.log(Level.INFO, "resourceLaunch. {0}", 
resourceLaunchEvent.toString());
 
       final File localStagingFolder =
           
Files.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix()).toFile();
 
       final Configuration evaluatorConfiguration = Tang.Factory.getTang()
-          
.newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
+          .newConfigurationBuilder(resourceLaunchEvent.getEvaluatorConf())
           .bindImplementation(TempFileCreator.class, 
WorkingDirectoryTempFileCreator.class)
           .build();
 
@@ -94,15 +94,15 @@ final class MesosResourceLaunchHandler implements 
ResourceLaunchHandler {
           localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
       this.configurationSerializer.toFile(evaluatorConfiguration, 
configurationFile);
 
-      JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
+      JobJarMaker.copy(resourceLaunchEvent.getFileSet(), localStagingFolder);
 
       final FileSystem fileSystem = FileSystem.get(new 
org.apache.hadoop.conf.Configuration());
-      final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + 
resourceLaunchProto.getIdentifier() + "/");
+      final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + 
resourceLaunchEvent.getIdentifier() + "/");
       FileUtil.copy(localStagingFolder, fileSystem, hdfsFolder, false, new 
org.apache.hadoop.conf.Configuration());
 
       // TODO: Replace REEFExecutor with a simple launch command (we only need 
to launch REEFExecutor)
       final LaunchCommandBuilder commandBuilder;
-      switch (resourceLaunchProto.getType()) {
+      switch (resourceLaunchEvent.getType()) {
         case JVM:
           commandBuilder = new 
JavaLaunchCommandBuilder().setClassPath(this.classpath.getEvaluatorClasspath());
           break;
@@ -115,13 +115,13 @@ final class MesosResourceLaunchHandler implements 
ResourceLaunchHandler {
 
       final List<String> command = commandBuilder
           .setErrorHandlerRID(this.remoteManager.getMyIdentifier())
-          .setLaunchID(resourceLaunchProto.getIdentifier())
+          .setLaunchID(resourceLaunchEvent.getIdentifier())
           
.setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath())
-          .setMemory((int) (this.jvmHeapFactor * 
this.executors.getMemory(resourceLaunchProto.getIdentifier())))
+          .setMemory((int) (this.jvmHeapFactor * 
this.executors.getMemory(resourceLaunchEvent.getIdentifier())))
           .build();
 
       this.executors.launchEvaluator(
-          new EvaluatorLaunch(resourceLaunchProto.getIdentifier(), 
StringUtils.join(command, ' ')));
+          new EvaluatorLaunch(resourceLaunchEvent.getIdentifier(), 
StringUtils.join(command, ' ')));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
index 41c487e..4c34aef 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.mesos.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
 
 import javax.inject.Inject;
@@ -36,7 +36,7 @@ final class MesosResourceReleaseHandler implements 
ResourceReleaseHandler {
   }
 
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto 
resourceReleaseProto) {
-    REEFScheduler.onResourceRelease(resourceReleaseProto);
+  public void onNext(final ResourceReleaseEvent resourceReleaseEvent) {
+    REEFScheduler.onResourceRelease(resourceReleaseEvent);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
index a9c1016..feca5b8 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.mesos.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
 
 import javax.inject.Inject;
@@ -36,7 +36,7 @@ final class MesosResourceRequestHandler implements 
ResourceRequestHandler {
   }
 
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceRequestProto 
resourceRequestProto) {
-    REEFScheduler.onResourceRequest(resourceRequestProto);
+  public void onNext(final ResourceRequestEvent resourceRequestEvent) {
+    REEFScheduler.onResourceRequest(resourceRequestEvent);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
index fd5cce2..c8a5fa6 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
@@ -19,11 +19,11 @@
 package org.apache.reef.runtime.mesos.driver;
 
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.ResourceStatusProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 
@@ -31,35 +31,35 @@ import javax.inject.Inject;
 
 @Private
 final class REEFEventHandlers {
-  private final EventHandler<ResourceAllocationProto> 
resourceAllocationEventHandler;
-  private final EventHandler<RuntimeStatusProto> runtimeStatusEventHandler;
-  private final EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler;
-  private final EventHandler<ResourceStatusProto> 
resourceStatusHandlerEventHandler;
+  private final EventHandler<ResourceAllocationEvent> 
resourceAllocationEventHandler;
+  private final EventHandler<RuntimeStatusEvent> runtimeStatusEventHandler;
+  private final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler;
+  private final EventHandler<ResourceStatusEvent> 
resourceStatusHandlerEventHandler;
 
   @Inject
-  REEFEventHandlers(final 
@Parameter(RuntimeParameters.ResourceAllocationHandler.class) 
EventHandler<ResourceAllocationProto> resourceAllocationEventHandler,
-                    final 
@Parameter(RuntimeParameters.RuntimeStatusHandler.class) 
EventHandler<RuntimeStatusProto> runtimeStatusEventHandler,
-                    final 
@Parameter(RuntimeParameters.NodeDescriptorHandler.class) 
EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler,
-                    final 
@Parameter(RuntimeParameters.ResourceStatusHandler.class) 
EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler) {
+  REEFEventHandlers(final 
@Parameter(RuntimeParameters.ResourceAllocationHandler.class) 
EventHandler<ResourceAllocationEvent> resourceAllocationEventHandler,
+                    final 
@Parameter(RuntimeParameters.RuntimeStatusHandler.class) 
EventHandler<RuntimeStatusEvent> runtimeStatusEventHandler,
+                    final 
@Parameter(RuntimeParameters.NodeDescriptorHandler.class) 
EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler,
+                    final 
@Parameter(RuntimeParameters.ResourceStatusHandler.class) 
EventHandler<ResourceStatusEvent> resourceStatusHandlerEventHandler) {
     this.resourceAllocationEventHandler = resourceAllocationEventHandler;
     this.runtimeStatusEventHandler = runtimeStatusEventHandler;
     this.nodeDescriptorEventHandler = nodeDescriptorEventHandler;
     this.resourceStatusHandlerEventHandler = resourceStatusHandlerEventHandler;
   }
 
-  void onNodeDescriptor(final NodeDescriptorProto nodeDescriptorProto) {
+  void onNodeDescriptor(final NodeDescriptorEvent nodeDescriptorProto) {
     this.nodeDescriptorEventHandler.onNext(nodeDescriptorProto);
   }
 
-  void onRuntimeStatus(final RuntimeStatusProto runtimeStatusProto) {
+  void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusProto) {
     this.runtimeStatusEventHandler.onNext(runtimeStatusProto);
   }
 
-  void onResourceAllocation(final ResourceAllocationProto 
resourceAllocationProto) {
+  void onResourceAllocation(final ResourceAllocationEvent 
resourceAllocationProto) {
     this.resourceAllocationEventHandler.onNext(resourceAllocationProto);
   }
 
-  void onResourceStatus(final ResourceStatusProto resourceStatusProto) {
+  void onResourceStatus(final ResourceStatusEvent resourceStatusProto) {
     this.resourceStatusHandlerEventHandler.onNext(resourceStatusProto);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
index 9c2c6d9..ba11aae 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
@@ -20,22 +20,23 @@ package org.apache.reef.runtime.mesos.driver;
 
 import com.google.protobuf.ByteString;
 import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.reef.proto.DriverRuntimeProtocol;
-import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.ResourceReleaseProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.ResourceRequestProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto.Builder;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.proto.ReefServiceProtos.State;
 import 
org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.mesos.driver.parameters.MesosMasterIp;
 import org.apache.reef.runtime.mesos.evaluator.REEFExecutor;
 import org.apache.reef.runtime.mesos.util.EvaluatorControl;
-import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
 import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
 import org.apache.reef.runtime.mesos.util.MesosRemoteManager;
 import org.apache.reef.tang.annotations.Parameter;
@@ -106,8 +107,8 @@ final class REEFScheduler implements Scheduler {
   private final Map<String, Offer> offers = new ConcurrentHashMap<>();
 
   private int outstandingRequestCounter = 0;
-  private final ConcurrentLinkedQueue<ResourceRequestProto> 
outstandingRequests = new ConcurrentLinkedQueue<>();
-  private final Map<String, ResourceRequestProto> executorIdToLaunchedRequests 
= new ConcurrentHashMap<>();
+  private final ConcurrentLinkedQueue<ResourceRequestEvent> 
outstandingRequests = new ConcurrentLinkedQueue<>();
+  private final Map<String, ResourceRequestEvent> executorIdToLaunchedRequests 
= new ConcurrentHashMap<>();
   private final REEFExecutors executors;
 
   @Inject
@@ -151,24 +152,24 @@ final class REEFScheduler implements Scheduler {
    */
   @Override
   public void resourceOffers(final SchedulerDriver driver, final 
List<Protos.Offer> offers) {
-    final Map<String, NodeDescriptorProto.Builder> nodeDescriptorProtos = new 
HashMap<>();
+    final Map<String, NodeDescriptorEventImpl.Builder> nodeDescriptorEvents = 
new HashMap<>();
 
     for (final Offer offer : offers) {
-      if (nodeDescriptorProtos.get(offer.getSlaveId().getValue()) == null) {
-        nodeDescriptorProtos.put(offer.getSlaveId().getValue(), 
NodeDescriptorProto.newBuilder()
-            .setIdentifier(offer.getSlaveId().getValue())
-            .setHostName(offer.getHostname())
-            .setPort(MESOS_SLAVE_PORT)
-            .setMemorySize(getMemory(offer)));
+      if (nodeDescriptorEvents.get(offer.getSlaveId().getValue()) == null) {
+        nodeDescriptorEvents.put(offer.getSlaveId().getValue(), 
NodeDescriptorEventImpl.newBuilder()
+                .setIdentifier(offer.getSlaveId().getValue())
+                .setHostName(offer.getHostname())
+                .setPort(MESOS_SLAVE_PORT)
+                .setMemorySize(getMemory(offer)));
       } else {
-        final NodeDescriptorProto.Builder builder = 
nodeDescriptorProtos.get(offer.getSlaveId().getValue());
-        builder.setMemorySize(builder.getMemorySize() + getMemory(offer));
+        final NodeDescriptorEventImpl.Builder builder = 
nodeDescriptorEvents.get(offer.getSlaveId().getValue());
+        builder.setMemorySize(builder.build().getMemorySize() + 
getMemory(offer));
       }
 
       this.offers.put(offer.getId().getValue(), offer);
     }
 
-    for (final NodeDescriptorProto.Builder ndpBuilder : 
nodeDescriptorProtos.values()) {
+    for (final NodeDescriptorEventImpl.Builder ndpBuilder : 
nodeDescriptorEvents.values()) {
       this.reefEventHandlers.onNodeDescriptor(ndpBuilder.build());
     }
 
@@ -190,8 +191,8 @@ final class REEFScheduler implements Scheduler {
   public void statusUpdate(final SchedulerDriver driver, final 
Protos.TaskStatus taskStatus) {
     LOG.log(Level.SEVERE, "Task Status Update:", taskStatus.toString());
 
-    final DriverRuntimeProtocol.ResourceStatusProto.Builder resourceStatus =
-        
DriverRuntimeProtocol.ResourceStatusProto.newBuilder().setIdentifier(taskStatus.getTaskId().getValue());
+    final ResourceStatusEventImpl.Builder resourceStatus =
+        
ResourceStatusEventImpl.newBuilder().setIdentifier(taskStatus.getTaskId().getValue());
 
     switch(taskStatus.getState()) {
       case TASK_STARTING:
@@ -251,8 +252,8 @@ final class REEFScheduler implements Scheduler {
                            final Protos.SlaveID slaveId,
                            final int status) {
     final String diagnostics = "Executor Lost. executorid: 
"+executorId.getValue()+" slaveid: "+slaveId.getValue();
-    final DriverRuntimeProtocol.ResourceStatusProto resourceStatus =
-        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+    final ResourceStatusEvent resourceStatus =
+        ResourceStatusEventImpl.newBuilder()
             .setIdentifier(executorId.getValue())
             .setState(State.FAILED)
             .setExitCode(status)
@@ -283,39 +284,39 @@ final class REEFScheduler implements Scheduler {
     }
   }
 
-  public void onResourceRequest(final ResourceRequestProto 
resourceRequestProto) {
-    this.outstandingRequestCounter += resourceRequestProto.getResourceCount();
+  public void onResourceRequest(final ResourceRequestEvent 
resourceRequestEvent) {
+    this.outstandingRequestCounter += resourceRequestEvent.getResourceCount();
     updateRuntimeStatus();
-    doResourceRequest(resourceRequestProto);
+    doResourceRequest(resourceRequestEvent);
   }
 
-  public void onResourceRelease(final ResourceReleaseProto 
resourceReleaseProto) {
-    this.executors.releaseEvaluator(new 
EvaluatorRelease(resourceReleaseProto.getIdentifier()));
-    this.executors.remove(resourceReleaseProto.getIdentifier());
+  public void onResourceRelease(final ResourceReleaseEvent 
resourceReleaseEvent) {
+    this.executors.releaseEvaluator(new 
EvaluatorRelease(resourceReleaseEvent.getIdentifier()));
+    this.executors.remove(resourceReleaseEvent.getIdentifier());
     updateRuntimeStatus();
   }
 
   /**
    * Greedily acquire resources by launching a Mesos Task(w/ our custom 
MesosExecutor) on REEF Evaluator request.
    * Either called from onResourceRequest(for a new request) or 
resourceOffers(for an outstanding request).
-   * TODO: reflect priority and rack/node locality specified in 
resourceRequestProto.
+   * TODO: reflect priority and rack/node locality specified in 
resourceRequestEvent.
    */
-  private synchronized void doResourceRequest(final ResourceRequestProto 
resourceRequestProto) {
-    int tasksToLaunchCounter = resourceRequestProto.getResourceCount();
+  private synchronized void doResourceRequest(final ResourceRequestEvent 
resourceRequestEvent) {
+    int tasksToLaunchCounter = resourceRequestEvent.getResourceCount();
 
     for (final Offer offer : this.offers.values()) {
-      final int cpuSlots = getCpu(offer) / 
resourceRequestProto.getVirtualCores();
-      final int memSlots = getMemory(offer) / 
resourceRequestProto.getMemorySize();
+      final int cpuSlots = getCpu(offer) / 
resourceRequestEvent.getVirtualCores().get();
+      final int memSlots = getMemory(offer) / 
resourceRequestEvent.getMemorySize().get();
       final int taskNum = Math.min(Math.min(cpuSlots, memSlots), 
tasksToLaunchCounter);
 
-      if (taskNum > 0 && satisfySlaveConstraint(resourceRequestProto, offer)) {
+      if (taskNum > 0 && satisfySlaveConstraint(resourceRequestEvent, offer)) {
         final List<TaskInfo> tasksToLaunch = new ArrayList<>();
         tasksToLaunchCounter -= taskNum;
 
         // Launch as many MesosTasks on the same node(offer) as possible to 
exploit locality.
         for (int j = 0; j < taskNum; j++) {
           final String id = offer.getId().getValue() + "-" + String.valueOf(j);
-          final String executorLaunchCommand = getExecutorLaunchCommand(id, 
resourceRequestProto.getMemorySize());
+          final String executorLaunchCommand = getExecutorLaunchCommand(id, 
resourceRequestEvent.getMemorySize().get());
 
           final ExecutorInfo executorInfo = ExecutorInfo.newBuilder()
               .setExecutorId(ExecutorID.newBuilder()
@@ -334,24 +335,24 @@ final class REEFScheduler implements Scheduler {
               .setName(id)
               .setSlaveId(offer.getSlaveId())
               .addResources(Resource.newBuilder()
-                  .setName("mem")
-                  .setType(Type.SCALAR)
-                  .setScalar(Value.Scalar.newBuilder()
-                      .setValue(resourceRequestProto.getMemorySize())
+                      .setName("mem")
+                      .setType(Type.SCALAR)
+                      .setScalar(Value.Scalar.newBuilder()
+                              
.setValue(resourceRequestEvent.getMemorySize().get())
+                              .build())
                       .build())
-                  .build())
               .addResources(Resource.newBuilder()
-                  .setName("cpus")
-                  .setType(Type.SCALAR)
-                  .setScalar(Value.Scalar.newBuilder()
-                      .setValue(resourceRequestProto.getVirtualCores())
+                      .setName("cpus")
+                      .setType(Type.SCALAR)
+                      .setScalar(Value.Scalar.newBuilder()
+                              
.setValue(resourceRequestEvent.getVirtualCores().get())
+                              .build())
                       .build())
-                  .build())
               .setExecutor(executorInfo)
               .build();
 
           tasksToLaunch.add(taskInfo);
-          this.executorIdToLaunchedRequests.put(id, resourceRequestProto);
+          this.executorIdToLaunchedRequests.put(id, resourceRequestEvent);
         }
 
         final Filters filters = 
Filters.newBuilder().setRefuseSeconds(0).build();
@@ -365,24 +366,24 @@ final class REEFScheduler implements Scheduler {
     this.offers.clear();
 
     // Save leftovers that couldn't be launched
-    outstandingRequests.add(ResourceRequestProto.newBuilder()
-        .mergeFrom(resourceRequestProto)
+    outstandingRequests.add(ResourceRequestEventImpl.newBuilder()
+        .mergeFrom(resourceRequestEvent)
         .setResourceCount(tasksToLaunchCounter)
         .build());
   }
 
   private void handleNewExecutor(final Protos.TaskStatus taskStatus) {
-    final ResourceRequestProto resourceRequestProto =
+    final ResourceRequestEvent resourceRequestProto =
         
this.executorIdToLaunchedRequests.remove(taskStatus.getTaskId().getValue());
 
     final EventHandler<EvaluatorControl> evaluatorControlHandler =
         this.mesosRemoteManager.getHandler(taskStatus.getMessage(), 
EvaluatorControl.class);
-    this.executors.add(taskStatus.getTaskId().getValue(), 
resourceRequestProto.getMemorySize(), evaluatorControlHandler);
+    this.executors.add(taskStatus.getTaskId().getValue(), 
resourceRequestProto.getMemorySize().get(), evaluatorControlHandler);
 
-    final ResourceAllocationProto alloc = 
DriverRuntimeProtocol.ResourceAllocationProto.newBuilder()
+    final ResourceAllocationEvent alloc = 
ResourceAllocationEventImpl.newBuilder()
         .setIdentifier(taskStatus.getTaskId().getValue())
         .setNodeId(taskStatus.getSlaveId().getValue())
-        .setResourceMemory(resourceRequestProto.getMemorySize())
+        .setResourceMemory(resourceRequestProto.getMemorySize().get())
         .build();
     reefEventHandlers.onResourceAllocation(alloc);
 
@@ -391,7 +392,7 @@ final class REEFScheduler implements Scheduler {
   }
 
   private synchronized void updateRuntimeStatus() {
-    final Builder builder = 
DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+    final RuntimeStatusEventImpl.Builder builder = 
RuntimeStatusEventImpl.newBuilder()
         .setName(RUNTIME_NAME)
         .setState(State.RUNNING)
         .setOutstandingContainerRequests(this.outstandingRequestCounter);
@@ -411,7 +412,7 @@ final class REEFScheduler implements Scheduler {
       throw new RuntimeException(e);
     }
 
-    final Builder runtimeStatusBuilder = RuntimeStatusProto.newBuilder()
+    final RuntimeStatusEventImpl.Builder runtimeStatusBuilder = 
RuntimeStatusEventImpl.newBuilder()
         .setState(State.FAILED)
         .setName(RUNTIME_NAME);
 
@@ -425,9 +426,9 @@ final class REEFScheduler implements Scheduler {
     this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build());
   }
 
-  private boolean satisfySlaveConstraint(final ResourceRequestProto 
resourceRequestProto, final Offer offer) {
-    return resourceRequestProto.getNodeNameCount() == 0 ||
-        
resourceRequestProto.getNodeNameList().contains(offer.getSlaveId().getValue());
+  private boolean satisfySlaveConstraint(final ResourceRequestEvent 
resourceRequestEvent, final Offer offer) {
+    return resourceRequestEvent.getNodeNameList().size() == 0 ||
+        
resourceRequestEvent.getNodeNameList().contains(offer.getSlaveId().getValue());
   }
 
   private int getMemory(final Offer offer) {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index eb6b802..62800e7 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
-import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.JobJarMaker;
@@ -85,28 +85,28 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   }
 
   @Override
-  public void onNext(final ClientRuntimeProtocol.JobSubmissionProto 
jobSubmissionProto) {
+  public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
 
-    LOG.log(Level.FINEST, "Submitting job with ID [{0}]", 
jobSubmissionProto.getIdentifier());
+    LOG.log(Level.FINEST, "Submitting job with ID [{0}]", 
jobSubmissionEvent.getIdentifier());
 
     try (final YarnSubmissionHelper submissionHelper =
              new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, 
this.classpath)) {
 
       LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
       final JobFolder jobFolderOnDfs = 
this.uploader.createJobFolder(submissionHelper.getApplicationId());
-      final Configuration driverConfiguration = 
makeDriverConfiguration(jobSubmissionProto, jobFolderOnDfs.getPath());
-      final File jobSubmissionFile = 
this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, 
driverConfiguration);
+      final Configuration driverConfiguration = 
makeDriverConfiguration(jobSubmissionEvent, jobFolderOnDfs.getPath());
+      final File jobSubmissionFile = 
this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, 
driverConfiguration);
       final LocalResource driverJarOnDfs = 
jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile);
 
       submissionHelper
           .addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs)
-          .setApplicationName(jobSubmissionProto.getIdentifier())
-          .setDriverMemory(jobSubmissionProto.getDriverMemory())
-          .setPriority(getPriority(jobSubmissionProto))
-          .setQueue(getQueue(jobSubmissionProto, "default"))
-          .submit(jobSubmissionProto.getRemoteId());
+          .setApplicationName(jobSubmissionEvent.getIdentifier())
+          .setDriverMemory(jobSubmissionEvent.getDriverMemory().get())
+          .setPriority(getPriority(jobSubmissionEvent))
+          .setQueue(getQueue(jobSubmissionEvent, "default"))
+          .submit(jobSubmissionEvent.getRemoteId());
 
-      LOG.log(Level.FINEST, "Submitted job with ID [{0}]", 
jobSubmissionProto.getIdentifier());
+      LOG.log(Level.FINEST, "Submitted job with ID [{0}]", 
jobSubmissionEvent.getIdentifier());
     } catch (final YarnException | IOException e) {
       throw new RuntimeException("Unable to submit Driver to YARN.", e);
     }
@@ -116,9 +116,9 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
    * Assembles the Driver configuration.
    */
   private Configuration makeDriverConfiguration(
-      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+      final JobSubmissionEvent jobSubmissionEvent,
       final Path jobFolderPath) throws IOException {
-    final Configuration config = 
this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration());
+    final Configuration config = jobSubmissionEvent.getConfiguration();
     final String userBoundJobSubmissionDirectory = 
config.getNamedParameter((NamedParameterNode<?>) 
config.getClassHierarchy().getNode(ReflectionUtilities.getFullName(DriverJobSubmissionDirectory.class)));
     LOG.log(Level.FINE, "user bound job submission Directory: " + 
userBoundJobSubmissionDirectory);
     final String finalJobFolderPath =
@@ -127,27 +127,24 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
     return Configurations.merge(
         YarnDriverConfiguration.CONF
             .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, 
finalJobFolderPath)
-            .set(YarnDriverConfiguration.JOB_IDENTIFIER, 
jobSubmissionProto.getIdentifier())
-            .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
jobSubmissionProto.getRemoteId())
+            .set(YarnDriverConfiguration.JOB_IDENTIFIER, 
jobSubmissionEvent.getIdentifier())
+            .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
jobSubmissionEvent.getRemoteId())
             .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
             .build(),
-        
this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
+        config);
   }
 
-  private static int getPriority(final 
ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
-    return jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() 
: 0;
+  private static int getPriority(final JobSubmissionEvent jobSubmissionEvent) {
+    return jobSubmissionEvent.getPriority().orElse(0);
   }
 
   /**
-   * Extract the queue name from the jobSubmissionProto or return default if 
none is set.
+   * Extract the queue name from the jobSubmissionEvent or return default if 
none is set.
    * <p/>
    * TODO: Revisit this. We also have a named parameter for the queue in 
YarnClientConfiguration.
    */
-  private final String getQueue(final ClientRuntimeProtocol.JobSubmissionProto 
jobSubmissionProto,
+  private final String getQueue(final JobSubmissionEvent jobSubmissionEvent,
                                 final String defaultQueue) {
-    return jobSubmissionProto.hasQueue() && 
!jobSubmissionProto.getQueue().isEmpty() ?
-        jobSubmissionProto.getQueue() : defaultQueue;
+    return jobSubmissionEvent.getQueue().orElse(defaultQueue);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
index 3a2ff5c..a264b66 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.io.TempFileCreator;
 import org.apache.reef.io.WorkingDirectoryTempFileCreator;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
 import org.apache.reef.runtime.common.files.JobJarMaker;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
@@ -88,12 +88,12 @@ final class EvaluatorSetupHelper {
   /**
    * Sets up the LocalResources for a new Evaluator.
    *
-   * @param resourceLaunchProto
+   * @param resourceLaunchEvent
    * @return
    * @throws IOException
    */
   Map<String, LocalResource> getResources(
-      final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
+      final ResourceLaunchEvent resourceLaunchEvent)
       throws IOException {
 
     final Map<String, LocalResource> result = new HashMap<>();
@@ -103,10 +103,10 @@ final class EvaluatorSetupHelper {
 
     // Write the configuration
     final File configurationFile = new File(localStagingFolder, 
this.fileNames.getEvaluatorConfigurationName());
-    
this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchProto),
 configurationFile);
+    
this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchEvent),
 configurationFile);
 
     // Copy files to the staging folder
-    JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
+    JobJarMaker.copy(resourceLaunchEvent.getFileSet(), localStagingFolder);
 
     // Make a JAR file out of it
     final File localFile = tempFileCreator.createTempFile(
@@ -132,15 +132,15 @@ final class EvaluatorSetupHelper {
   /**
    * Assembles the configuration for an Evaluator.
    *
-   * @param resourceLaunchProto
+   * @param resourceLaunchEvent
    * @return
    * @throws IOException
    */
 
-  private Configuration makeEvaluatorConfiguration(final 
DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
+  private Configuration makeEvaluatorConfiguration(final ResourceLaunchEvent 
resourceLaunchEvent)
       throws IOException {
     return Tang.Factory.getTang()
-        
.newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
+        .newConfigurationBuilder(resourceLaunchEvent.getEvaluatorConf())
         .bindImplementation(TempFileCreator.class, 
WorkingDirectoryTempFileCreator.class)
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
index ec43666..a204add 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
@@ -19,8 +19,11 @@
 package org.apache.reef.runtime.yarn.driver;
 
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 
@@ -32,20 +35,20 @@ import javax.inject.Inject;
 // This is a great place to add a thread boundary, should that need arise.
 @Private
 final class REEFEventHandlers implements AutoCloseable {
-  private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> 
resourceAllocationHandler;
-  private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> 
resourceStatusHandler;
-  private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> 
runtimeStatusHandler;
-  private final EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> 
nodeDescriptorProtoEventHandler;
+  private final EventHandler<ResourceAllocationEvent> 
resourceAllocationHandler;
+  private final EventHandler<ResourceStatusEvent> resourceStatusHandler;
+  private final EventHandler<RuntimeStatusEvent> runtimeStatusHandler;
+  private final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler;
 
   @Inject
-  REEFEventHandlers(final 
@Parameter(RuntimeParameters.NodeDescriptorHandler.class) 
EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> 
nodeDescriptorProtoEventHandler,
-                    final 
@Parameter(RuntimeParameters.RuntimeStatusHandler.class) 
EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> 
runtimeStatusProtoEventHandler,
-                    final 
@Parameter(RuntimeParameters.ResourceAllocationHandler.class) 
EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> 
resourceAllocationHandler,
-                    final 
@Parameter(RuntimeParameters.ResourceStatusHandler.class) 
EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler) {
+  REEFEventHandlers(final 
@Parameter(RuntimeParameters.NodeDescriptorHandler.class) 
EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler,
+                    final 
@Parameter(RuntimeParameters.RuntimeStatusHandler.class) 
EventHandler<RuntimeStatusEvent> runtimeStatusProtoEventHandler,
+                    final 
@Parameter(RuntimeParameters.ResourceAllocationHandler.class) 
EventHandler<ResourceAllocationEvent> resourceAllocationHandler,
+                    final 
@Parameter(RuntimeParameters.ResourceStatusHandler.class) 
EventHandler<ResourceStatusEvent> resourceStatusHandler) {
     this.resourceAllocationHandler = resourceAllocationHandler;
     this.resourceStatusHandler = resourceStatusHandler;
     this.runtimeStatusHandler = runtimeStatusProtoEventHandler;
-    this.nodeDescriptorProtoEventHandler = nodeDescriptorProtoEventHandler;
+    this.nodeDescriptorEventHandler = nodeDescriptorEventHandler;
   }
 
   /**
@@ -53,35 +56,35 @@ final class REEFEventHandlers implements AutoCloseable {
    *
    * @param nodeDescriptorProto
    */
-  void onNodeDescriptor(final DriverRuntimeProtocol.NodeDescriptorProto 
nodeDescriptorProto) {
-    this.nodeDescriptorProtoEventHandler.onNext(nodeDescriptorProto);
+  void onNodeDescriptor(final NodeDescriptorEvent nodeDescriptorProto) {
+    this.nodeDescriptorEventHandler.onNext(nodeDescriptorProto);
   }
 
   /**
    * Update REEF's view on the runtime status.
    *
-   * @param runtimeStatusProto
+   * @param runtimeStatusEvent
    */
-  void onRuntimeStatus(final DriverRuntimeProtocol.RuntimeStatusProto 
runtimeStatusProto) {
-    this.runtimeStatusHandler.onNext(runtimeStatusProto);
+  void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusEvent) {
+    this.runtimeStatusHandler.onNext(runtimeStatusEvent);
   }
 
   /**
    * Inform REEF of a fresh resource allocation.
    *
-   * @param resourceAllocationProto
+   * @param resourceAllocationEvent
    */
-  void onResourceAllocation(final 
DriverRuntimeProtocol.ResourceAllocationProto resourceAllocationProto) {
-    this.resourceAllocationHandler.onNext(resourceAllocationProto);
+  void onResourceAllocation(final ResourceAllocationEvent 
resourceAllocationEvent) {
+    this.resourceAllocationHandler.onNext(resourceAllocationEvent);
   }
 
   /**
    * Update REEF on a change to the status of a resource.
    *
-   * @param resourceStatusProto
+   * @param resourceStatusEvent
    */
-  void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto 
resourceStatusProto) {
-    this.resourceStatusHandler.onNext(resourceStatusProto);
+  void onResourceStatus(final ResourceStatusEvent resourceStatusEvent) {
+    this.resourceStatusHandler.onNext(resourceStatusEvent);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
index 073cf8c..8401e67 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
@@ -73,18 +73,18 @@ public final class YARNResourceLaunchHandler implements 
ResourceLaunchHandler {
   }
 
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto 
resourceLaunchProto) {
+  public void onNext(final ResourceLaunchEvent resourceLaunchEvent) {
     try {
 
-      final String containerId = resourceLaunchProto.getIdentifier();
-      LOG.log(Level.FINEST, "TIME: Start ResourceLaunchProto {0}", 
containerId);
+      final String containerId = resourceLaunchEvent.getIdentifier();
+      LOG.log(Level.FINEST, "TIME: Start ResourceLaunch {0}", containerId);
       final Container container = this.containers.get(containerId);
       LOG.log(Level.FINEST, "Setting up container launch container for 
id={0}", container.getId());
       final Map<String, LocalResource> localResources =
-          this.evaluatorSetupHelper.getResources(resourceLaunchProto);
+          this.evaluatorSetupHelper.getResources(resourceLaunchEvent);
 
       final LaunchCommandBuilder commandBuilder;
-      switch (resourceLaunchProto.getType()) {
+      switch (resourceLaunchEvent.getType()) {
         case JVM:
           commandBuilder = new JavaLaunchCommandBuilder()
               .setClassPath(this.classpath.getEvaluatorClasspath());
@@ -94,12 +94,12 @@ public final class YARNResourceLaunchHandler implements 
ResourceLaunchHandler {
           break;
         default:
           throw new IllegalArgumentException(
-              "Unsupported container type: " + resourceLaunchProto.getType());
+              "Unsupported container type: " + resourceLaunchEvent.getType());
       }
 
       final List<String> command = commandBuilder
-          .setErrorHandlerRID(resourceLaunchProto.getRemoteId())
-          .setLaunchID(resourceLaunchProto.getIdentifier())
+          .setErrorHandlerRID(resourceLaunchEvent.getRemoteId())
+          .setLaunchID(resourceLaunchEvent.getIdentifier())
           
.setConfigurationFileName(this.filenames.getEvaluatorConfigurationPath())
           .setMemory((int) (this.jvmHeapFactor * 
container.getResource().getMemory()))
           .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + 
this.filenames.getEvaluatorStderrFileName())
@@ -115,10 +115,10 @@ public final class YARNResourceLaunchHandler implements 
ResourceLaunchHandler {
       final ContainerLaunchContext ctx = 
YarnTypes.getContainerLaunchContext(command, localResources);
       this.yarnContainerManager.get().submit(container, ctx);
 
-      LOG.log(Level.FINEST, "TIME: End ResourceLaunchProto {0}", containerId);
+      LOG.log(Level.FINEST, "TIME: End ResourceLaunch {0}", containerId);
 
     } catch (final Throwable e) {
-      LOG.log(Level.WARNING, "Error handling resource launch message: " + 
resourceLaunchProto, e);
+      LOG.log(Level.WARNING, "Error handling resource launch message: " + 
resourceLaunchEvent, e);
       throw new RuntimeException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
index dda9fb3..9ce3c69 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
@@ -18,7 +18,7 @@
  */
 package org.apache.reef.runtime.yarn.driver;
 
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
 import org.apache.reef.tang.InjectionFuture;
 
@@ -42,8 +42,8 @@ public final class YARNResourceReleaseHandler implements 
ResourceReleaseHandler
   }
 
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto 
resourceReleaseProto) {
-    final String containerId = resourceReleaseProto.getIdentifier();
+  public void onNext(final ResourceReleaseEvent resourceReleaseEvent) {
+    final String containerId = resourceReleaseEvent.getIdentifier();
     LOG.log(Level.FINEST, "Releasing container {0}", containerId);
     this.yarnContainerManager.get().release(containerId);
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index bd8128b..37fc784 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -33,14 +33,13 @@ import 
org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.reef.proto.DriverRuntimeProtocol;
-import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.ResourceStatusProto;
-import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
 import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
 import org.apache.reef.runtime.yarn.util.YarnTypes;
 import org.apache.reef.tang.annotations.Parameter;
@@ -140,7 +139,7 @@ final class YarnContainerManager
 
   @Override
   public void onShutdownRequest() {
-    this.reefEventHandlers.onRuntimeStatus(RuntimeStatusProto.newBuilder()
+    this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder()
         .setName(RUNTIME_NAME).setState(ReefServiceProtos.State.DONE).build());
     this.driverStatusManager.onError(new Exception("Shutdown requested by 
YARN."));
   }
@@ -182,8 +181,8 @@ final class YarnContainerManager
   public final void onContainerStopped(final ContainerId containerId) {
     final boolean hasContainer = 
this.containers.hasContainer(containerId.toString());
     if (hasContainer) {
-      final ResourceStatusProto.Builder resourceStatusBuilder =
-          
ResourceStatusProto.newBuilder().setIdentifier(containerId.toString());
+      final ResourceStatusEventImpl.Builder resourceStatusBuilder =
+          
ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
       resourceStatusBuilder.setState(ReefServiceProtos.State.DONE);
       this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
     }
@@ -292,7 +291,7 @@ final class YarnContainerManager
 
   private void onNodeReport(final NodeReport nodeReport) {
     LOG.log(Level.FINE, "Send node descriptor: {0}", nodeReport);
-    this.reefEventHandlers.onNodeDescriptor(NodeDescriptorProto.newBuilder()
+    
this.reefEventHandlers.onNodeDescriptor(NodeDescriptorEventImpl.newBuilder()
         .setIdentifier(nodeReport.getNodeId().toString())
         .setHostName(nodeReport.getNodeId().getHost())
         .setPort(nodeReport.getNodeId().getPort())
@@ -303,8 +302,8 @@ final class YarnContainerManager
 
   private void handleContainerError(final ContainerId containerId, final 
Throwable throwable) {
 
-    final ResourceStatusProto.Builder resourceStatusBuilder =
-        ResourceStatusProto.newBuilder().setIdentifier(containerId.toString());
+    final ResourceStatusEventImpl.Builder resourceStatusBuilder =
+        
ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
 
     resourceStatusBuilder.setState(ReefServiceProtos.State.FAILED);
     resourceStatusBuilder.setExitCode(1);
@@ -364,8 +363,8 @@ final class YarnContainerManager
     if (hasContainer) {
       LOG.log(Level.FINE, "Received container status: {0}", containerId);
 
-      final ResourceStatusProto.Builder status =
-          ResourceStatusProto.newBuilder().setIdentifier(containerId);
+      final ResourceStatusEventImpl.Builder status =
+          ResourceStatusEventImpl.newBuilder().setIdentifier(containerId);
 
       switch (value.getState()) {
         case COMPLETE:
@@ -445,7 +444,7 @@ final class YarnContainerManager
           doHomogeneousRequests();
 
           LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core 
number = {1}", new Object[]{container.getResource().getMemory(), 
container.getResource().getVirtualCores()});
-          
this.reefEventHandlers.onResourceAllocation(ResourceAllocationProto.newBuilder()
+          
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
               .setIdentifier(container.getId().toString())
               .setNodeId(container.getNodeId().toString())
               .setResourceMemory(container.getResource().getMemory())
@@ -507,8 +506,8 @@ final class YarnContainerManager
    */
   private void updateRuntimeStatus() {
 
-    final DriverRuntimeProtocol.RuntimeStatusProto.Builder builder =
-        DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+    final RuntimeStatusEventImpl.Builder builder =
+        RuntimeStatusEventImpl.newBuilder()
             .setName(RUNTIME_NAME)
             .setState(ReefServiceProtos.State.RUNNING)
             
.setOutstandingContainerRequests(this.containerRequestCounter.get());
@@ -533,7 +532,7 @@ final class YarnContainerManager
       this.resourceManager.stop();
     }
 
-    final RuntimeStatusProto.Builder runtimeStatusBuilder = 
RuntimeStatusProto.newBuilder()
+    final RuntimeStatusEventImpl.Builder runtimeStatusBuilder = 
RuntimeStatusEventImpl.newBuilder()
         .setState(ReefServiceProtos.State.FAILED)
         .setName(RUNTIME_NAME);
 
@@ -590,7 +589,7 @@ final class YarnContainerManager
     LOG.log(Level.WARNING, "Container [" + containerId +
         "] has failed during driver restart process, FailedEvaluaorHandler 
will be triggered, but no additional evaluator can be requested due to 
YARN-2433.");
     // trigger a failed evaluator event
-    this.reefEventHandlers.onResourceStatus(ResourceStatusProto.newBuilder()
+    
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
         .setIdentifier(containerId)
         .setState(ReefServiceProtos.State.FAILED)
         .setExitCode(1)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
index 6762362..9b8fcc0 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
 
 import javax.inject.Inject;
@@ -51,41 +51,41 @@ public final class YarnResourceRequestHandler implements 
ResourceRequestHandler
   }
 
   @Override
-  public synchronized void onNext(final 
DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
-    LOG.log(Level.FINEST, "Got ResourceRequestProto in 
YarnResourceRequestHandler: memory = {0}, cores = {1}.", new 
Object[]{resourceRequestProto.getMemorySize(), 
resourceRequestProto.getVirtualCores()});
+  public synchronized void onNext(final ResourceRequestEvent 
resourceRequestEvent) {
+    LOG.log(Level.FINEST, "Got ResourceRequestEvent in 
YarnResourceRequestHandler: memory = {0}, cores = {1}.", new 
Object[]{resourceRequestEvent.getMemorySize(), 
resourceRequestEvent.getVirtualCores()});
 
-    final String[] nodes = resourceRequestProto.getNodeNameCount() == 0 ? null 
:
-        resourceRequestProto.getNodeNameList().toArray(new 
String[resourceRequestProto.getNodeNameCount()]);
-    final String[] racks = resourceRequestProto.getRackNameCount() == 0 ? null 
:
-        resourceRequestProto.getRackNameList().toArray(new 
String[resourceRequestProto.getRackNameCount()]);
+    final String[] nodes = resourceRequestEvent.getNodeNameList().size() == 0 
? null :
+        resourceRequestEvent.getNodeNameList().toArray(new 
String[resourceRequestEvent.getNodeNameList().size()]);
+    final String[] racks = resourceRequestEvent.getRackNameList().size() == 0 
? null :
+        resourceRequestEvent.getRackNameList().toArray(new 
String[resourceRequestEvent.getRackNameList().size()]);
 
     // set the priority for the request
-    final Priority pri = getPriority(resourceRequestProto);
-    final Resource resource = getResource(resourceRequestProto);
-    final boolean relax_locality = !resourceRequestProto.hasRelaxLocality() || 
resourceRequestProto.getRelaxLocality();
+    final Priority pri = getPriority(resourceRequestEvent);
+    final Resource resource = getResource(resourceRequestEvent);
+    final boolean relax_locality = 
resourceRequestEvent.getRelaxLocality().orElse(true);
 
     final AMRMClient.ContainerRequest[] containerRequests =
-        new 
AMRMClient.ContainerRequest[resourceRequestProto.getResourceCount()];
+        new 
AMRMClient.ContainerRequest[resourceRequestEvent.getResourceCount()];
 
-    for (int i = 0; i < resourceRequestProto.getResourceCount(); i++) {
+    for (int i = 0; i < resourceRequestEvent.getResourceCount(); i++) {
       containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, 
racks, pri, relax_locality);
     }
     this.yarnContainerRequestHandler.onContainerRequest(containerRequests);
   }
 
-  private synchronized Resource getResource(final 
DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
+  private synchronized Resource getResource(final ResourceRequestEvent 
resourceRequestEvent) {
     final Resource result = Records.newRecord(Resource.class);
-    final int memory = getMemory(resourceRequestProto.getMemorySize());
-    final int core = resourceRequestProto.getVirtualCores();
+    final int memory = getMemory(resourceRequestEvent.getMemorySize().get());
+    final int core = resourceRequestEvent.getVirtualCores().get();
     LOG.log(Level.FINEST, "Resource requested: memory = {0}, virtual core 
count = {1}.", new Object[]{memory, core});
     result.setMemory(memory);
     result.setVirtualCores(core);
     return result;
   }
 
-  private synchronized Priority getPriority(final 
DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
+  private synchronized Priority getPriority(final ResourceRequestEvent 
resourceRequestEvent) {
     final Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(resourceRequestProto.hasPriority() ? 
resourceRequestProto.getPriority() : 1);
+    pri.setPriority(resourceRequestEvent.getPriority().orElse(1));
     return pri;
   }
 

Reply via email to