Author: edwardyoon
Date: Wed Mar  4 09:28:46 2015
New Revision: 1663907

URL: http://svn.apache.org/r1663907
Log:
HAMA-848: Refactor YARN module for hadoop 2.x stable version (Minho Kim via 
edwardyoon)

Added:
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/pom.xml
    hama/trunk/yarn/pom.xml
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java

Modified: hama/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Mar  4 09:28:46 2015
@@ -11,6 +11,7 @@ Release 0.7.0 (unreleased changes)
 
   BUG FIXES
   
+   HAMA-848: Refactor YARN module for hadoop 2.x stable version (Minho Kim via 
edwardyoon)
    HAMA-906: Automatic activation of halted vertices without received messages 
(edwardyoon)
    HAMA-905: Fix Pi Estimator Example (Martin Illecker)
    HAMA-889: NonDefaultIterator of DenseDoubleVector never reaches the end 
(Yexi Jiang)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java 
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Mar 
 4 09:28:46 2015
@@ -303,14 +303,21 @@ public class BSPJobClient extends Config
     BSPJob job = pJob;
     job.setJobID(jobId);
 
-    ClusterStatus clusterStatus = getClusterStatus(true);
-    int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
-        clusterStatus.getMaxTasks() - clusterStatus.getTasks());
+    int maxTasks;
+    if (job.getConfiguration().getBoolean("hama.yarn.application", false)) {
+      int maxMem = 
job.getConfiguration().getInt("yarn.nodemanager.resource.memory-mb", 0);
+      int minAllocationMem = 
job.getConfiguration().getInt("yarn.scheduler.minimum-allocation-mb", 1024);
+      maxTasks = maxMem / minAllocationMem;
+    } else {
+      ClusterStatus clusterStatus = getClusterStatus(true);
+      maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+          clusterStatus.getMaxTasks() - clusterStatus.getTasks());
 
-    if (maxTasks < job.getNumBspTask()) {
-      LOG.warn("The configured number of tasks has exceeded the maximum 
allowed. Job will run with "
-          + maxTasks + " tasks.");
-      job.setNumBspTask(maxTasks);
+      if (maxTasks < job.getNumBspTask()) {
+        LOG.warn("The configured number of tasks has exceeded the maximum 
allowed. Job will run with "
+            + maxTasks + " tasks.");
+        job.setNumBspTask(maxTasks);
+      }
     }
 
     Path submitJobDir = new Path(getSystemDir(), "submit_"

Modified: hama/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Wed Mar  4 09:28:46 2015
@@ -130,6 +130,18 @@
       <activation>
         <activeByDefault>true</activeByDefault>
       </activation>
+      
+      <modules>
+        <module>c++</module>
+        <module>commons</module>
+        <module>core</module>
+        <module>graph</module>
+        <module>examples</module>
+        <module>ml</module>
+        <module>mesos</module>
+        <module>dist</module>
+      </modules>
+      
       <dependencies>
         <!-- build for Hadoop 1.x -->
         <dependency>
@@ -150,6 +162,18 @@
         <hadoop.version>2.5.0</hadoop.version>
       </properties>
 
+      <modules>
+        <module>c++</module>
+        <module>commons</module>
+        <module>core</module>
+        <module>graph</module>
+        <module>examples</module>
+        <module>ml</module>
+        <module>mesos</module>
+        <module>yarn</module>
+        <module>dist</module>
+      </modules>
+  
       <dependencies>
         <!-- build for Hadoop 2.x -->
         <dependency>
@@ -317,17 +341,6 @@
     </dependencies>
   </dependencyManagement>
 
-  <modules>
-    <module>c++</module>
-    <module>commons</module>
-    <module>core</module>
-    <module>graph</module>
-    <module>examples</module>
-    <module>ml</module>
-    <module>mesos</module>
-    <module>dist</module>
-  </modules>
-
   <build>
     <extensions>
       <extension>

Modified: hama/trunk/yarn/pom.xml
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/pom.xml?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/pom.xml (original)
+++ hama/trunk/yarn/pom.xml Wed Mar  4 09:28:46 2015
@@ -19,20 +19,16 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.6.3-SNAPSHOT</version>
+    <version>0.7.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-yarn</artifactId>
   <name>yarn</name>
-  <version>0.6.3-SNAPSHOT</version>
+  <version>0.7.0-SNAPSHOT</version>
   <packaging>jar</packaging>
 
-  <properties>
-    <hadoop.version>1.2.0</hadoop.version>
-  </properties>
-      
   <dependencies>
     <dependency>
       <groupId>org.apache.hama</groupId>
@@ -54,27 +50,41 @@
       <artifactId>avro</artifactId>
       <version>1.5.3</version>
     </dependency>
-
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-api</artifactId>
-      <version>0.23.1</version>
+      <version>${hadoop.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
-      <version>0.23.1</version>
+      <version>${hadoop.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-tests</artifactId>
-      <version>0.23.1</version>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+      <version>${hadoop.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
   </dependencies>
 
   <build>

Modified: 
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java 
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java 
Wed Mar  4 09:28:46 2015
@@ -19,7 +19,11 @@ package org.apache.hama.bsp;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -32,32 +36,37 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.Job.JobState;
 import org.apache.hama.bsp.sync.SyncServerRunner;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.Server;
 import org.apache.hama.util.BSPNetUtils;
 
+
 /**
  * BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
  */
@@ -73,7 +82,8 @@ public class BSPApplicationMaster implem
 
   private Clock clock;
   private YarnRPC yarnRPC;
-  private AMRMProtocol amrmRPC;
+
+  private ApplicationMasterProtocol amrmRPC;
 
   private ApplicationAttemptId appAttemptId;
   private String applicationName;
@@ -106,6 +116,7 @@ public class BSPApplicationMaster implem
     this.jobFile = args[0];
     this.localConf = new YarnConfiguration();
     this.jobConf = getSubmitConfiguration(jobFile);
+    fs = FileSystem.get(jobConf);
 
     this.applicationName = jobConf.get("bsp.job.name",
         "<no bsp job name defined>");
@@ -128,10 +139,12 @@ public class BSPApplicationMaster implem
     startSyncServer();
 
     startRPCServers();
+
     /*
      * Make sure that this executes after the start the RPC servers, because we
      * are readjusting the configuration.
      */
+
     rewriteSubmitConfiguration(jobFile, jobConf);
 
     String jobSplit = jobConf.get("bsp.job.split.file");
@@ -146,7 +159,7 @@ public class BSPApplicationMaster implem
     }
 
     this.amrmRPC = getYarnRPCConnection(localConf);
-    registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort,
+    registerApplicationMaster(amrmRPC, hostname, clientPort,
         "http://localhost:8080";);
   }
 
@@ -159,15 +172,14 @@ public class BSPApplicationMaster implem
    */
   private void startRPCServers() throws IOException {
     // start the RPC server which talks to the client
-    this.clientServer = RPC.getServer(BSPClient.class, this, hostname,
-        clientPort, jobConf);
+    this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, 
jobConf);
     this.clientServer.start();
 
     // start the RPC server which talks to the tasks
     this.taskServerPort = BSPNetUtils.getFreePort(10000);
-    this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname,
-        taskServerPort, jobConf);
+    this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf);
     this.taskServer.start();
+
     // readjusting the configuration to let the tasks know where we are.
     this.jobConf.set("hama.umbilical.address", hostname + ":" + 
taskServerPort);
   }
@@ -191,35 +203,61 @@ public class BSPApplicationMaster implem
    * @param yarnConf
    * @return a new RPC connection to the Resource Manager.
    */
-  private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) {
+  private ApplicationMasterProtocol getYarnRPCConnection(Configuration 
yarnConf) throws IOException {
     // Connect to the Scheduler of the ResourceManager.
-    InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+    UserGroupInformation currentUser = 
UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
+
+    final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
         YarnConfiguration.RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+
+    Token<? extends TokenIdentifier> amRMToken = 
setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens());
+    currentUser.addToken(amRMToken);
+
+    final Configuration conf = yarnConf;
+
+    ApplicationMasterProtocol client = currentUser
+        .doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+          @Override
+          public ApplicationMasterProtocol run() {
+            return (ApplicationMasterProtocol) 
yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
+          }
+        });
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress,
-        yarnConf);
+    return client;
   }
 
+  private Token<? extends TokenIdentifier> setupAndReturnAMRMToken(
+      InetSocketAddress rmBindAddress,
+      Collection<Token<? extends TokenIdentifier>> allTokens) {
+    for (Token<? extends TokenIdentifier> token : allTokens) {
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        SecurityUtil.setTokenService(token, rmBindAddress);
+        return token;
+      }
+    }
+    return null;
+  }
+
+
   /**
    * Registers this application master with the Resource Manager and retrieves 
a
    * response which is used to launch additional containers.
    */
   private static RegisterApplicationMasterResponse registerApplicationMaster(
-      AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID,
-      String appMasterHostName, int appMasterRpcPort,
-      String appMasterTrackingUrl) throws YarnRemoteException {
+      ApplicationMasterProtocol resourceManager, String appMasterHostName, int 
appMasterRpcPort,
+      String appMasterTrackingUrl) throws YarnException, IOException {
 
     RegisterApplicationMasterRequest appMasterRequest = Records
         .newRecord(RegisterApplicationMasterRequest.class);
-    appMasterRequest.setApplicationAttemptId(appAttemptID);
     appMasterRequest.setHost(appMasterHostName);
     appMasterRequest.setRpcPort(appMasterRpcPort);
     // TODO tracking URL
     appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
     RegisterApplicationMasterResponse response = resourceManager
         .registerApplicationMaster(appMasterRequest);
-    LOG.debug("ApplicationMaster has maximum resource capability of: "
+    LOG.info("ApplicationMaster has maximum resource capability of: "
         + response.getMaximumResourceCapability().getMemory());
     return response;
   }
@@ -234,12 +272,13 @@ public class BSPApplicationMaster implem
   private static ApplicationAttemptId getApplicationAttemptId()
       throws IOException {
     Map<String, String> envs = System.getenv();
-    if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
       throw new IllegalArgumentException(
           "ApplicationAttemptId not set in the environment");
     }
+
     return ConverterUtils.toContainerId(
-        envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV))
+        envs.get(Environment.CONTAINER_ID.name()))
         .getApplicationAttemptId();
   }
 
@@ -259,33 +298,34 @@ public class BSPApplicationMaster implem
     }
   }
 
-  private void cleanup() throws YarnRemoteException {
+  private void cleanup() throws YarnException, IOException {
     syncServer.stop();
+
     if (threadPool != null && !threadPool.isShutdown()) {
       threadPool.shutdownNow();
     }
+
     clientServer.stop();
     taskServer.stop();
     FinishApplicationMasterRequest finishReq = Records
         .newRecord(FinishApplicationMasterRequest.class);
-    finishReq.setAppAttemptId(appAttemptId);
     switch (job.getState()) {
       case SUCCESS:
-        finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+        finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
         break;
       case KILLED:
-        finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED);
+        finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED);
         break;
       case FAILED:
-        finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+        finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
         break;
       default:
-        finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+        finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
     }
     this.amrmRPC.finishApplicationMaster(finishReq);
   }
 
-  public static void main(String[] args) throws YarnRemoteException {
+  public static void main(String[] args) throws YarnException, IOException {
     // we expect getting the qualified path of the job.xml as the first
     // element in the arguments
     BSPApplicationMaster master = null;
@@ -301,17 +341,19 @@ public class BSPApplicationMaster implem
     }
   }
 
-  /*
-   * Some utility methods
-   */
-
   /**
    * Reads the configuration from the given path.
    */
-  private static Configuration getSubmitConfiguration(String path) {
+  private static Configuration getSubmitConfiguration(String path)
+      throws IOException {
     Path jobSubmitPath = new Path(path);
     Configuration jobConf = new HamaConfiguration();
-    jobConf.addResource(jobSubmitPath);
+
+    FileSystem fs = FileSystem.get(URI.create(path), jobConf);
+
+    InputStream in =fs.open(jobSubmitPath);
+    jobConf.addResource(in);
+
     return jobConf;
   }
 
@@ -326,6 +368,7 @@ public class BSPApplicationMaster implem
     FSDataOutputStream out = fs.create(jobSubmitPath);
     conf.writeXml(out);
     out.close();
+
     LOG.info("Written new configuration back to " + path);
   }
 
@@ -340,12 +383,6 @@ public class BSPApplicationMaster implem
   }
 
   @Override
-  public ProtocolSignature getProtocolSignature(String protocol,
-      long clientVersion, int clientMethodsHash) throws IOException {
-    return new ProtocolSignature(HamaRPCProtocolVersion.versionID, null);
-  }
-
-  @Override
   public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
       throws IOException, InterruptedException {
     if (taskStatus.getSuperstepCount() > superstep) {
@@ -372,7 +409,8 @@ public class BSPApplicationMaster implem
   @Override
   public Task getTask(TaskAttemptID taskid) throws IOException {
     BSPJobClient.RawSplit assignedSplit = null;
-    String splitName = NullInputFormat.NullInputSplit.class.getCanonicalName();
+    String splitName = NullInputFormat.NullInputSplit.class.getName();
+    //String splitName = NullInputSplit.class.getCanonicalName();
     if (splits != null) {
       assignedSplit = splits[taskid.id];
       splitName = assignedSplit.getClassName();

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java Wed Mar  4 
09:28:46 2015
@@ -18,7 +18,7 @@
 package org.apache.hama.bsp;
 
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.ipc.VersionedProtocol;
 
 public interface BSPClient extends VersionedProtocol {
 

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java Wed Mar  4 
09:28:46 2015
@@ -17,15 +17,16 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hama.ipc.RPC;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
@@ -37,7 +38,7 @@ public class BSPRunner {
 
   private static final Log LOG = LogFactory.getLog(BSPRunner.class);
 
-  private Configuration conf;
+  private HamaConfiguration conf;
   private TaskAttemptID id;
   private BSPPeerImpl<?, ?, ?, ?, ? extends Writable> peer;
   private Counters counters = new Counters();
@@ -49,7 +50,10 @@ public class BSPRunner {
   public BSPRunner(String jobId, int taskAttemptId, Path confPath)
       throws Exception {
     conf = new HamaConfiguration();
-    conf.addResource(confPath);
+    FileSystem fs = FileSystem.get(confPath.toUri(), conf);
+    InputStream in = fs.open(confPath);
+    conf.addResource(in);
+
     this.id = new TaskAttemptID(jobId, 0, taskAttemptId, 0);
     this.id.id = taskAttemptId;
 
@@ -59,6 +63,7 @@ public class BSPRunner {
     conf.set(Constants.PEER_HOST, BSPNetUtils.getCanonicalHostname());
 
     String umbilicalAddress = conf.get("hama.umbilical.address");
+
     if (umbilicalAddress == null || umbilicalAddress.isEmpty()
         || !umbilicalAddress.contains(":")) {
       throw new IllegalArgumentException(
@@ -69,9 +74,10 @@ public class BSPRunner {
     InetSocketAddress address = new InetSocketAddress(hostPort[0],
         Integer.valueOf(hostPort[1]));
 
-    BSPPeerProtocol umbilical = RPC.getProxy(BSPPeerProtocol.class,
-        HamaRPCProtocolVersion.versionID, address, conf);
-
+    BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
+        BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, address,
+        conf);
+    
     BSPJob job = new BSPJob(new HamaConfiguration(conf));
 
     BSPTask task = (BSPTask) umbilical.getTask(id);

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java 
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Wed 
Mar  4 09:28:46 2015
@@ -17,31 +17,22 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -51,17 +42,22 @@ public class BSPTaskLauncher {
 
   private final Container allocatedContainer;
   private final int id;
-  private final ContainerManager cm;
+  private final ContainerManagementProtocol cm;
   private final Configuration conf;
   private String user;
   private final Path jobFile;
   private final BSPJobID jobId;
 
-  private GetContainerStatusRequest statusRequest;
+  private GetContainerStatusesRequest statusRequest;
+  
+  @Override
+  protected void finalize() throws Throwable {
+    stopAndCleanup();
+  }
 
-  public BSPTaskLauncher(int id, Container container, ContainerManager cm,
-      Configuration conf, Path jobFile, BSPJobID jobId)
-      throws YarnRemoteException {
+  public BSPTaskLauncher(int id, Container container, 
ContainerManagementProtocol cm,
+                         Configuration conf, Path jobFile, BSPJobID jobId)
+      throws YarnException {
     this.id = id;
     this.cm = cm;
     this.conf = conf;
@@ -75,19 +71,18 @@ public class BSPTaskLauncher {
     }
   }
 
-  @Override
-  protected void finalize() throws Throwable {
-    stopAndCleanup();
-  }
+  public void stopAndCleanup() throws YarnException, IOException {
+    StopContainersRequest stopRequest = 
Records.newRecord(StopContainersRequest.class);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(allocatedContainer.getId());
+    LOG.info("getId : " + allocatedContainer.getId());
+    stopRequest.setContainerIds(containerIds);
+    LOG.info("StopContainer : " + stopRequest.getContainerIds());
+    cm.stopContainers(stopRequest);
 
-  public void stopAndCleanup() throws YarnRemoteException {
-    StopContainerRequest stopRequest = Records
-        .newRecord(StopContainerRequest.class);
-    stopRequest.setContainerId(allocatedContainer.getId());
-    cm.stopContainer(stopRequest);
   }
 
-  public void start() throws IOException {
+  public void start() throws IOException, YarnException {
     LOG.info("Spawned task with id: " + this.id
         + " for allocated container id: "
         + this.allocatedContainer.getId().toString());
@@ -103,38 +98,44 @@ public class BSPTaskLauncher {
    */
   public BSPTaskStatus poll() throws Exception {
 
-    ContainerStatus lastStatus;
-    if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
-        .getState() != ContainerState.COMPLETE) {
+    ContainerStatus lastStatus = null;
+    GetContainerStatusesResponse getContainerStatusesResponse = 
cm.getContainerStatuses(statusRequest);
+    List<ContainerStatus> containerStatuses = 
getContainerStatusesResponse.getContainerStatuses();
+    for (ContainerStatus containerStatus : containerStatuses) {
+      LOG.info("Got container status for containerID="
+          + containerStatus.getContainerId() + ", state="
+          + containerStatus.getState() + ", exitStatus="
+          + containerStatus.getExitStatus() + ", diagnostics="
+          + containerStatus.getDiagnostics());
+
+      if (containerStatus.getContainerId().equals(allocatedContainer.getId())) 
{
+        lastStatus = containerStatus;
+        break;
+      }
+    }
+    if (lastStatus.getState() != ContainerState.COMPLETE) {
       return null;
     }
-    LOG.info(this.id + "\tLast report comes with existatus of "
+    LOG.info(this.id + " Last report comes with exitstatus of "
         + lastStatus.getExitStatus() + " and diagnose string of "
         + lastStatus.getDiagnostics());
+
     return new BSPTaskStatus(id, lastStatus.getExitStatus());
   }
 
-  private GetContainerStatusRequest setupContainer(
-      Container allocatedContainer, ContainerManager cm, String user, int id)
-      throws IOException {
+  private GetContainerStatusesRequest setupContainer(
+      Container allocatedContainer, ContainerManagementProtocol cm, String 
user, int id) throws IOException, YarnException {
     LOG.info("Setting up a container for user " + user + " with id of " + id
         + " and containerID of " + allocatedContainer.getId() + " as " + user);
     // Now we setup a ContainerLaunchContext
     ContainerLaunchContext ctx = Records
         .newRecord(ContainerLaunchContext.class);
 
-    ctx.setContainerId(allocatedContainer.getId());
-    ctx.setResource(allocatedContainer.getResource());
-    ctx.setUser(user);
-
-    /*
-     * jar
-     */
+    // Set the local resources
+    Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
     LocalResource packageResource = Records.newRecord(LocalResource.class);
     FileSystem fs = FileSystem.get(conf);
-    Path packageFile = new Path(conf.get("bsp.jar"));
-    // FIXME there seems to be a problem with the converter utils and URL
-    // transformation
+    Path packageFile = new 
Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
     URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
         .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
     LOG.info("PackageURL has been composed to " + packageUrl.toString());
@@ -145,15 +146,29 @@ public class BSPTaskLauncher {
       LOG.fatal("If you see this error the workarround does not work", e);
     }
 
-    FileStatus fileStatus = fs.getFileStatus(packageFile);
     packageResource.setResource(packageUrl);
-    packageResource.setSize(fileStatus.getLen());
-    packageResource.setTimestamp(fileStatus.getModificationTime());
-    packageResource.setType(LocalResourceType.ARCHIVE);
+    
packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
+    
packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
+    packageResource.setType(LocalResourceType.FILE);
     packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-    LOG.info("Package resource: " + packageResource.getResource());
 
-    ctx.setLocalResources(Collections.singletonMap("package", 
packageResource));
+    localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
+
+    Path hamaReleaseFile = new 
Path(System.getenv(YARNBSPConstants.HAMA_RELEASE_LOCATION));
+    URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
+        .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+    LOG.info("Hama release URL has been composed to " + 
hamaReleaseUrl.toString());
+
+    LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
+    hamaReleaseRsrc.setResource(hamaReleaseUrl);
+    
hamaReleaseRsrc.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_SIZE)));
+    
hamaReleaseRsrc.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP)));
+    hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
+    hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+
+    localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+
+    ctx.setLocalResources(localResources);
 
     /*
      * TODO Package classpath seems not to work if you're in pseudo distributed
@@ -161,32 +176,64 @@ public class BSPTaskLauncher {
      * So we will check if our jar file has the file:// prefix and put it into
      * the CP directly
      */
-    String cp = "$CLASSPATH:./*:./package/*:./*:";
-    if (packageUrl.getScheme() != null && 
packageUrl.getScheme().equals("file")) {
-      cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
-          .toString() + ":";
-      LOG.info("Localized file scheme detected, adjusting CP to: " + cp);
-    }
-    String[] cmds = {
-        "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" "
-            + BSPRunner.class.getCanonicalName(),
-        jobId.getJtIdentifier(),
-        id + "",
-        this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
-            .toString(),
-        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
-        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
-    ctx.setCommands(Arrays.asList(cmds));
-    LOG.info("Starting command: " + Arrays.toString(cmds));
+
+    StringBuilder classPathEnv = new StringBuilder(
+        
ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+        .append("./*");
+    for (String c : conf.getStrings(
+        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(File.pathSeparatorChar);
+      classPathEnv.append(c.trim());
+    }
+
+    classPathEnv.append(File.pathSeparator);
+    classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
+        "/" + YARNBSPConstants.HAMA_RELEASE_VERSION +  "/*");
+    classPathEnv.append(File.pathSeparator);
+    classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
+        "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/lib/*");
+
+    Vector<CharSequence> vargs = new Vector<CharSequence>();
+    vargs.add("${JAVA_HOME}/bin/java");
+    vargs.add("-cp " + classPathEnv + "");
+    vargs.add(BSPRunner.class.getCanonicalName());
+    
+    vargs.add(jobId.getJtIdentifier());
+    vargs.add(Integer.toString(id));
+    vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+        .toString());
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/bsp.stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/bsp.stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+
+    ctx.setCommands(commands);
+    LOG.info("Starting command: " + commands);
 
     StartContainerRequest startReq = Records
         .newRecord(StartContainerRequest.class);
     startReq.setContainerLaunchContext(ctx);
-    cm.startContainer(startReq);
+    startReq.setContainerToken(allocatedContainer.getContainerToken());
 
-    GetContainerStatusRequest statusReq = Records
-        .newRecord(GetContainerStatusRequest.class);
-    statusReq.setContainerId(allocatedContainer.getId());
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(startReq);
+    StartContainersRequest requestList = 
StartContainersRequest.newInstance(list);
+    cm.startContainers(requestList);
+
+    GetContainerStatusesRequest statusReq = Records
+        .newRecord(GetContainerStatusesRequest.class);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(allocatedContainer.getId());
+    statusReq.setContainerIds(containerIds);
     return statusReq;
   }
 

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java Wed Mar  4 
09:28:46 2015
@@ -17,7 +17,9 @@
  */
 package org.apache.hama.bsp;
 
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
 
 /**
  * Main interface to interact with the job. Provides only getters.
@@ -34,7 +36,7 @@ public interface Job {
 
   public JobState startJob() throws Exception;
 
-  public void cleanup() throws YarnRemoteException;
+  public void cleanup() throws YarnException, IOException;
 
   JobState getState();
 

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java Wed Mar  4 
09:28:46 2015
@@ -1,8 +1,8 @@
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
+ * distributed with this work for additional information
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
@@ -17,34 +17,28 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.security.PrivilegedAction;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
 
@@ -66,7 +60,7 @@ public class JobImpl implements Job {
 
   private ApplicationAttemptId appAttemptId;
   private YarnRPC yarnRPC;
-  private AMRMProtocol resourceManager;
+  private ApplicationMasterProtocol resourceManager;
 
   private List<Container> allocatedContainers;
   private List<ContainerId> releasedContainers = Collections.emptyList();
@@ -76,11 +70,20 @@ public class JobImpl implements Job {
 
   private int lastResponseID = 0;
 
+  private int getMemoryRequirements() {
+    String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+    if (newMemoryProperty == null) {
+      LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child 
opts...");
+      return getMemoryFromOptString(childOpts);
+    } else {
+      return Integer.valueOf(newMemoryProperty);
+    }
+  }
+
   public JobImpl(ApplicationAttemptId appAttemptId,
-      Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
-      String jobFile, BSPJobID jobId) {
+                 Configuration jobConfiguration, YarnRPC yarnRPC, 
ApplicationMasterProtocol amrmRPC,
+                 String jobFile, BSPJobID jobId) {
     super();
-    this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
     this.appAttemptId = appAttemptId;
     this.yarnRPC = yarnRPC;
     this.resourceManager = amrmRPC;
@@ -88,35 +91,29 @@ public class JobImpl implements Job {
     this.state = JobState.NEW;
     this.jobId = jobId;
     this.conf = jobConfiguration;
+    this.numBSPTasks = conf.getInt("bsp.peers.num", 1);
     this.childOpts = conf.get("bsp.child.java.opts");
 
     this.taskMemoryInMb = getMemoryRequirements();
-    LOG.info("Memory per task: " + taskMemoryInMb + "m!");
-  }
-
-  private int getMemoryRequirements() {
-    String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
-    if (newMemoryProperty == null) {
-      LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child 
opts...");
-      return getMemoryFromOptString(childOpts);
-    } else {
-      return Integer.valueOf(newMemoryProperty);
-    }
   }
 
   // This really needs a testcase
   private static int getMemoryFromOptString(String opts) {
+    if (opts == null) {
+      return DEFAULT_MEMORY_MB;
+    }
+
     if (!opts.contains("-Xmx")) {
       LOG.info("No \"-Xmx\" option found in child opts, using default amount 
of memory!");
       return DEFAULT_MEMORY_MB;
     } else {
       // e.G: -Xmx512m
+
       int startIndex = opts.indexOf("-Xmx") + 4;
-      int endIndex = opts.indexOf(" ", startIndex);
-      String xmxString = opts.substring(startIndex, endIndex);
+      String xmxString = opts.substring(startIndex);
       char qualifier = xmxString.charAt(xmxString.length() - 1);
       int memory = Integer.valueOf(xmxString.substring(0,
-          xmxString.length() - 2));
+          xmxString.length() - 1));
       if (qualifier == 'm') {
         return memory;
       } else if (qualifier == 'g') {
@@ -133,28 +130,28 @@ public class JobImpl implements Job {
   public JobState startJob() throws Exception {
 
     this.allocatedContainers = new ArrayList<Container>(numBSPTasks);
+    NMTokenCache nmTokenCache = new NMTokenCache();
     while (allocatedContainers.size() < numBSPTasks) {
-
-      AllocateRequest req = BuilderUtils.newAllocateRequest(
-          appAttemptId,
-          lastResponseID,
-          0.0f,
-          createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
-              taskMemoryInMb, priority), releasedContainers);
+      AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f,
+          createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), 
taskMemoryInMb,
+              priority), releasedContainers, null);
 
       AllocateResponse allocateResponse = resourceManager.allocate(req);
-      AMResponse amResponse = allocateResponse.getAMResponse();
-      LOG.info("Got response! ID: " + amResponse.getResponseId()
+      for (NMToken token : allocateResponse.getNMTokens()) {
+        nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
+      }
+
+      LOG.info("Got response ID: " + allocateResponse.getResponseId()
           + " with num of containers: "
-          + amResponse.getAllocatedContainers().size()
+          + allocateResponse.getAllocatedContainers().size()
           + " and following resources: "
-          + amResponse.getAvailableResources().getMemory() + "mb");
-      this.lastResponseID = amResponse.getResponseId();
+          + allocateResponse.getAvailableResources().getMemory() + "mb");
+      this.lastResponseID = allocateResponse.getResponseId();
+
+      
this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
+
+      LOG.info("Waiting to allocate " + (numBSPTasks - 
allocatedContainers.size()) + " more containers...");
 
-      // availableResources = amResponse.getAvailableResources();
-      this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
-      LOG.info("Waiting to allocate "
-          + (numBSPTasks - allocatedContainers.size()) + " more 
containers...");
       Thread.sleep(1000l);
     }
 
@@ -166,16 +163,25 @@ public class JobImpl implements Job {
           + allocatedContainer.getId() + ", containerNode="
           + allocatedContainer.getNodeId().getHost() + ":"
           + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
-          + allocatedContainer.getNodeHttpAddress() + ", containerState"
-          + allocatedContainer.getState() + ", containerResourceMemory"
+          + allocatedContainer.getNodeHttpAddress() + ", 
containerResourceMemory"
           + allocatedContainer.getResource().getMemory());
 
       // Connect to ContainerManager on the allocated container
-      String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"
-          + allocatedContainer.getNodeId().getPort();
-      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
-      ContainerManager cm = (ContainerManager) yarnRPC.getProxy(
-          ContainerManager.class, cmAddress, conf);
+      String user = conf.get("bsp.user.name");
+      if (user == null) {
+        user = System.getenv(ApplicationConstants.Environment.USER.name());
+      }
+
+      ContainerManagementProtocol cm = null;
+      try {
+        cm = getContainerManagementProtocolProxy(yarnRPC,
+            nmTokenCache.getToken(allocatedContainer.getNodeId().toString()), 
allocatedContainer.getNodeId(), user);
+      } catch (Exception e) {
+        LOG.error("Failed to create ContainerManager...");
+        if (cm != null)
+          yarnRPC.stopProxy(cm, conf);
+        e.printStackTrace();
+      }
 
       BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
           allocatedContainer, cm, conf, jobFile, jobId);
@@ -185,9 +191,12 @@ public class JobImpl implements Job {
       completionQueue.add(runnableLaunchContainer);
       id++;
     }
+
     LOG.info("Waiting for tasks to finish...");
     state = JobState.RUNNING;
     int completed = 0;
+
+    List<Integer> cleanupTasks = new ArrayList<Integer>();
     while (completed != numBSPTasks) {
       for (BSPTaskLauncher task : completionQueue) {
         BSPTaskStatus returnedTask = task.poll();
@@ -195,6 +204,7 @@ public class JobImpl implements Job {
         if (returnedTask != null) {
           if (returnedTask.getExitStatus() != 0) {
             LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
+            cleanupTask(returnedTask.getId());
             state = JobState.FAILED;
             return state;
           } else {
@@ -204,37 +214,64 @@ public class JobImpl implements Job {
             LOG.info("Waiting for " + (numBSPTasks - completed)
                 + " tasks to finish!");
           }
-          cleanupTask(returnedTask.getId());
+          cleanupTasks.add(returnedTask.getId());
         }
       }
       Thread.sleep(1000L);
     }
 
+    for (Integer stopId : cleanupTasks) {
+      cleanupTask(stopId);
+    }
+
     state = JobState.SUCCESS;
     return state;
   }
 
   /**
+   *
+   * @param rpc
+   * @param nmToken
+   * @param nodeId
+   * @param user
+   * @return
+   */
+  protected ContainerManagementProtocol getContainerManagementProtocolProxy(
+      final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) {
+    ContainerManagementProtocol proxy;
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    final InetSocketAddress addr =
+        NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
+    if (nmToken != null) {
+      ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
+    }
+
+    proxy = ugi
+        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+          @Override
+          public ContainerManagementProtocol run() {
+            return (ContainerManagementProtocol) rpc.getProxy(
+                ContainerManagementProtocol.class,
+                addr, conf);
+          }
+        });
+    return proxy;
+  }
+
+  /**
    * Makes a lookup for the taskid and stops its container and task. It also
    * removes the task from the launcher so that we won't have to stop it twice.
    * 
    * @param id
-   * @throws YarnRemoteException
+   * @throws YarnException
    */
-  private void cleanupTask(int id) throws YarnRemoteException {
+  private void cleanupTask(int id) throws YarnException, IOException {
     BSPTaskLauncher bspTaskLauncher = launchers.get(id);
     bspTaskLauncher.stopAndCleanup();
     launchers.remove(id);
     completionQueue.remove(bspTaskLauncher);
   }
 
-  @Override
-  public void cleanup() throws YarnRemoteException {
-    for (BSPTaskLauncher launcher : completionQueue) {
-      launcher.stopAndCleanup();
-    }
-  }
-
   private List<ResourceRequest> createBSPTaskRequest(int numTasks,
       int memoryInMb, int priority) {
 
@@ -247,7 +284,7 @@ public class JobImpl implements Job {
       // whether a particular rack/host is needed
       // useful for applications that are sensitive
       // to data locality
-      rsrcRequest.setHostName("*");
+      rsrcRequest.setResourceName("*");
 
       // set the priority for the request
       Priority pri = Records.newRecord(Priority.class);
@@ -269,6 +306,13 @@ public class JobImpl implements Job {
   }
 
   @Override
+  public void cleanup() throws YarnException, IOException {
+    for (BSPTaskLauncher launcher : completionQueue) {
+      launcher.stopAndCleanup();
+    }
+  }
+
+  @Override
   public JobState getState() {
     return state;
   }

Added: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java?rev=1663907&view=auto
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java 
(added)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java Wed 
Mar  4 09:28:46 2015
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * Contants used in both Client and Application Master
+ */
+public class YARNBSPConstants {
+
+  /**
+   * Environment key name pointing to the hama-yarn's location
+   */
+  public static final String HAMA_YARN_LOCATION = "HAMAYARNJARLOCATION";
+
+  /**
+   * Environment key name denoting the file content length for the hama-yarn 
application.
+   * Used to validate the local resource.
+   */
+  public static final String HAMA_YARN_SIZE = "HAMAYARNJARSIZE";
+
+  /**
+   * Environment key name denoting the file timestamp for the hama-yarn 
application.
+   * Used to validate the local resource.
+   */
+  public static final String HAMA_YARN_TIMESTAMP = "HAMAYARNJARTIMESTAMP";
+
+  /**
+   * Environment key name pointing to the hama release's location
+   */
+  public static final String HAMA_RELEASE_LOCATION = "HAMARELEASELOCATION";
+
+  /**
+   * Environment key name denoting the file content length for the hama 
release.
+   * Used to validate the local resource.
+   */
+  public static final String HAMA_RELEASE_SIZE = "HAMARELEASESIZE";
+
+  /**
+   * Environment key name denoting the file timestamp for the hama release.
+   * Used to validate the local resource.
+   */
+  public static final String HAMA_RELEASE_TIMESTAMP = "HAMARELEASETIMESTAMP";
+
+  /**
+   * Symbolic link name for application master's jar file in container local 
resource
+   */
+  public static final String APP_MASTER_JAR_PATH = "AppMaster.jar";
+
+  /**
+   * Symbolic link name for hama release archive in container local resource
+   */
+  public static final String HAMA_SYMLINK = "hama";
+
+  /**
+   * Hama release file name
+   */
+  public static final String HAMA_RELEASE_FILE = "hama-0.6.4.tar.gz";
+
+  /**
+   * Hama release version
+   */
+  public static final String HAMA_RELEASE_VERSION = "hama-0.6.4";
+
+  /**
+   * Hama release file source location
+   */
+  public static final String HAMA_SRC_PATH = "/home/hadoop";
+}

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Wed Mar  
4 09:28:46 2015
@@ -22,16 +22,12 @@ import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.HamaConfiguration;
@@ -43,14 +39,12 @@ public class YARNBSPJob extends BSPJob {
   private static volatile int id = 0;
 
   private YARNBSPJobClient submitClient;
-  private BSPClient client;
   private boolean submitted;
   private ApplicationReport report;
-  private ClientRMProtocol applicationsManager;
+  private ApplicationClientProtocol applicationsManager;
   private YarnRPC rpc;
 
   public YARNBSPJob(HamaConfiguration conf) throws IOException {
-    super(conf);
     submitClient = new YARNBSPJobClient(conf);
     YarnConfiguration yarnConf = new YarnConfiguration(conf);
     this.rpc = YarnRPC.create(conf);
@@ -58,15 +52,15 @@ public class YARNBSPJob extends BSPJob {
         YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
     LOG.info("Connecting to ResourceManager at " + rmAddress);
 
-    this.applicationsManager = ((ClientRMProtocol) rpc.getProxy(
-        ClientRMProtocol.class, rmAddress, conf));
+    this.applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf));
   }
 
   public void setMemoryUsedPerTaskInMb(int mem) {
     conf.setInt("bsp.child.mem.in.mb", mem);
   }
 
-  public void kill() throws YarnRemoteException {
+  public void kill() throws YarnException, IOException {
     if (submitClient != null) {
       KillApplicationRequest killRequest = Records
           .newRecord(KillApplicationRequest.class);
@@ -79,6 +73,7 @@ public class YARNBSPJob extends BSPJob {
   public void submit() throws IOException, InterruptedException {
     RunningJob submitJobInternal = submitClient.submitJobInternal(this,
         new BSPJobID("hama_yarn", id++));
+
     if (submitJobInternal != null) {
       submitted = true;
       report = submitClient.getReport();
@@ -95,45 +90,14 @@ public class YARNBSPJob extends BSPJob {
       this.submit();
     }
 
-    client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID,
-        NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), 
conf);
-    GetApplicationReportRequest reportRequest = Records
-        .newRecord(GetApplicationReportRequest.class);
-    reportRequest.setApplicationId(submitClient.getId());
-
-    GetApplicationReportResponse reportResponse = applicationsManager
-        .getApplicationReport(reportRequest);
-    ApplicationReport localReport = reportResponse.getApplicationReport();
-    long clientSuperStep = -1L;
-    while (localReport.getFinalApplicationStatus() != null
-        && localReport.getFinalApplicationStatus().ordinal() == 0) {
-      LOG.debug("currently in state: "
-          + localReport.getFinalApplicationStatus());
-      if (verbose) {
-        long remoteSuperStep = client.getCurrentSuperStep().get();
-        if (clientSuperStep < remoteSuperStep) {
-          clientSuperStep = remoteSuperStep;
-          LOG.info("Current supersteps number: " + clientSuperStep);
-        }
-      }
-      reportResponse = applicationsManager.getApplicationReport(reportRequest);
-      localReport = reportResponse.getApplicationReport();
-
-      Thread.sleep(3000L);
-    }
-
-    if (localReport.getFinalApplicationStatus() == 
FinalApplicationStatus.SUCCEEDED) {
-      LOG.info("Job succeeded!");
+    if (report != null && report.getApplicationId() == submitClient.getId()) {
       return true;
     } else {
-      LOG.info("Job failed with status: "
-          + localReport.getFinalApplicationStatus().toString() + "!");
       return false;
     }
-
   }
 
-  ClientRMProtocol getApplicationsManager() {
+  ApplicationClientProtocol getApplicationsManager() {
     return applicationsManager;
   }
 

Modified: 
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java 
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Wed 
Mar  4 09:28:46 2015
@@ -17,32 +17,37 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.nio.ByteBuffer;
+import java.util.*;
 
+import org.apache.commons.beanutils.Converter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 
 public class YARNBSPJobClient extends BSPJobClient {
@@ -52,20 +57,100 @@ public class YARNBSPJobClient extends BS
   private ApplicationId id;
   private ApplicationReport report;
 
+  // Configuration
+  private YarnClient yarnClient;
+  private YarnConfiguration yarnConf;
+
+  // Start time for client
+  private final long clientStartTime = System.currentTimeMillis();
+  // Timeout threshold for client. Kill app after time interval expires.
+  private long clientTimeout = 60000;
+
+  class NetworkedJob implements RunningJob {
+    @Override
+    public BSPJobID getID() {
+      return null;
+    }
+
+    @Override
+    public String getJobName() {
+      return null;
+    }
+
+    @Override
+    public long progress() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public boolean isComplete() throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean isSuccessful() throws IOException {
+      return false;
+    }
+
+    @Override
+    public void waitForCompletion() throws IOException {
+
+    }
+
+    @Override
+    public int getJobState() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void killJob() throws IOException {
+
+    }
+
+    @Override
+    public void killTask(TaskAttemptID taskId, boolean shouldFail) throws 
IOException {
+
+    }
+
+    @Override
+    public long getSuperstepCount() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public JobStatus getStatus() {
+      return null;
+    }
+
+    @Override
+    public TaskCompletionEvent[] getTaskCompletionEvents(int eventCounter) {
+      return new TaskCompletionEvent[0];
+    }
+
+    @Override
+    public String getJobFile() {
+      return null;
+    }
+  }
+
   public YARNBSPJobClient(HamaConfiguration conf) {
     setConf(conf);
+    yarnConf = new YarnConfiguration(conf);
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(yarnConf);
+    yarnClient.start();
   }
 
   @Override
   protected RunningJob launchJob(BSPJobID jobId, BSPJob normalJob,
       Path submitJobFile, FileSystem pFs) throws IOException {
-
     YARNBSPJob job = (YARNBSPJob) normalJob;
 
     LOG.info("Submitting job...");
     if (getConf().get("bsp.child.mem.in.mb") == null) {
       LOG.warn("BSP Child memory has not been set, YARN will guess your needs 
or use default values.");
     }
+
     FileSystem fs = pFs;
     if (fs == null) {
       fs = FileSystem.get(getConf());
@@ -77,121 +162,193 @@ public class YARNBSPJobClient extends BS
       LOG.debug("Retrieved username: " + s);
     }
 
-    GetNewApplicationRequest request = Records
-        .newRecord(GetNewApplicationRequest.class);
-    GetNewApplicationResponse response = job.getApplicationsManager()
-        .getNewApplication(request);
-    id = response.getApplicationId();
-    LOG.debug("Got new ApplicationId=" + id);
-
-    // Create a new ApplicationSubmissionContext
-    ApplicationSubmissionContext appContext = Records
-        .newRecord(ApplicationSubmissionContext.class);
-    // set the ApplicationId
-    appContext.setApplicationId(this.id);
-    // set the application name
-    appContext.setApplicationName(job.getJobName());
-
-    // Create a new container launch context for the AM's container
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
-
-    // Define the local resources required
-    Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
-    // Lets assume the jar we need for our ApplicationMaster is available in
-    // HDFS at a certain known path to us and we want to make it available to
-    // the ApplicationMaster in the launched container
-    if (job.getJar() == null) {
-      throw new IllegalArgumentException(
-          "Jar must be set in order to run the application!");
-    }
-    Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar");
-    fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath);
-    LOG.debug("Copying app jar to " + jarPath);
-    getConf()
-        .set(
-            "bsp.jar",
-            jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())
-                .toString());
-    FileStatus jarStatus = fs.getFileStatus(jarPath);
-    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
-    amJarRsrc.setType(LocalResourceType.FILE);
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
-    amJarRsrc.setTimestamp(jarStatus.getModificationTime());
-    amJarRsrc.setSize(jarStatus.getLen());
-    // this creates a symlink in the working directory
-    localResources.put("AppMaster.jar", amJarRsrc);
-    // Set the local resources into the launch context
-    amContainer.setLocalResources(localResources);
-
-    // Set up the environment needed for the launch context
-    Map<String, String> env = new HashMap<String, String>();
-    // Assuming our classes or jars are available as local resources in the
-    // working directory from which the command will be run, we need to append
-    // "." to the path.
-    // By default, all the hadoop specific classpaths will already be available
-    // in $CLASSPATH, so we should be careful not to overwrite it.
-    String classPathEnv = "$CLASSPATH:./*:";
-    env.put("CLASSPATH", classPathEnv);
-    amContainer.setEnvironment(env);
-
-    // Construct the command to be executed on the launched container
-    String command = "${JAVA_HOME}"
-        + "/bin/java -cp "
-        + classPathEnv
-        + " "
-        + BSPApplicationMaster.class.getCanonicalName()
-        + " "
-        + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
-            .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
-        + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
-        + "/stderr";
-
-    LOG.debug("Start command: " + command);
-
-    amContainer.setCommands(Collections.singletonList(command));
-
-    Resource capability = Records.newRecord(Resource.class);
-    // we have at least 3 threads, which comsumes 1mb each, for each bsptask 
and
-    // a base usage of 100mb
-    capability.setMemory(3 * job.getNumBspTask()
-        + getConf().getInt("hama.appmaster.memory.mb", 100));
-    LOG.info("Set memory for the application master to "
-        + capability.getMemory() + "mb!");
-    amContainer.setResource(capability);
-
-    // Set the container launch content into the ApplicationSubmissionContext
-    appContext.setAMContainerSpec(amContainer);
-
-    // Create the request to send to the ApplicationsManager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-    job.getApplicationsManager().submitApplication(appRequest);
-
-    GetApplicationReportRequest reportRequest = Records
-        .newRecord(GetApplicationReportRequest.class);
-    reportRequest.setApplicationId(id);
-    while (report == null || report.getHost().equals("N/A")) {
-      GetApplicationReportResponse reportResponse = job
-          .getApplicationsManager().getApplicationReport(reportRequest);
-      report = reportResponse.getApplicationReport();
-      try {
-        Thread.sleep(1000L);
-      } catch (InterruptedException e) {
-        LOG.error(
-            "Got interrupted while waiting for a response report from AM.", e);
+    try {
+      YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
+      LOG.info("Got Cluster metric info from ASM"
+          + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
+
+      List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
+          NodeState.RUNNING);
+      LOG.info("Got Cluster node info from ASM");
+      for (NodeReport node : clusterNodeReports) {
+        LOG.info("Got node report from ASM for"
+            + ", nodeId=" + node.getNodeId()
+            + ", nodeAddress" + node.getHttpAddress()
+            + ", nodeRackName" + node.getRackName()
+            + ", nodeNumContainers" + node.getNumContainers());
       }
-    }
-    LOG.info("Got report: " + report.getApplicationId() + " "
-        + report.getHost() + ":" + report.getRpcPort());
-    return new NetworkedJob();
-  }
 
-  @Override
-  protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException 
{
-    return Math.max(1, limitTasks);
+      QueueInfo queueInfo = yarnClient.getQueueInfo("default");
+      LOG.info("Queue info"
+          + ", queueName=" + queueInfo.getQueueName()
+          + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
+          + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+          + ", queueApplicationCount=" + queueInfo.getApplications().size()
+          + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
+
+      List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
+      for (QueueUserACLInfo aclInfo : listAclInfo) {
+        for (QueueACL userAcl : aclInfo.getUserAcls()) {
+          LOG.info("User ACL Info for Queue"
+              + ", queueName=" + aclInfo.getQueueName()
+              + ", userAcl=" + userAcl.name());
+        }
+      }
+
+      GetNewApplicationRequest request = 
Records.newRecord(GetNewApplicationRequest.class);
+      GetNewApplicationResponse response = 
job.getApplicationsManager().getNewApplication(request);
+      id = response.getApplicationId();
+
+      // Create a new ApplicationSubmissionContext
+      ApplicationSubmissionContext appContext = 
Records.newRecord(ApplicationSubmissionContext.class);
+      // set the ApplicationId
+      appContext.setApplicationId(this.id);
+      // set the application name
+      appContext.setApplicationName(job.getJobName());
+
+      // Create a new container launch context for the AM's container
+      ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
+
+      // Define the local resources required
+      Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
+      // Lets assume the jar we need for our ApplicationMaster is available in
+      // HDFS at a certain known path to us and we want to make it available to
+      // the ApplicationMaster in the launched container
+      if (job.getJar() == null) {
+        throw new IllegalArgumentException("Jar must be set in order to run 
the application!");
+      }
+      
+      Path jarPath = new Path(job.getJar());
+      jarPath = fs.makeQualified(jarPath);
+      getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), 
jarPath).toString());
+
+      FileStatus jarStatus = fs.getFileStatus(jarPath);
+      LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+      amJarRsrc.setType(LocalResourceType.FILE);
+      amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+      amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+      amJarRsrc.setTimestamp(jarStatus.getModificationTime());
+      amJarRsrc.setSize(jarStatus.getLen());
+
+      // this creates a symlink in the working directory
+      localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc);
+
+      // Copy from hama-${version}.tar.gz to HDFS
+      Path hamaDstPath = new Path(getSystemDir(), 
YARNBSPConstants.HAMA_RELEASE_FILE);
+      hamaDstPath = fs.makeQualified(hamaDstPath);
+      fs.copyFromLocalFile(false, true,
+          new Path(YARNBSPConstants.HAMA_SRC_PATH, 
YARNBSPConstants.HAMA_RELEASE_FILE),
+          hamaDstPath);
+      FileStatus hamaStatus = fs.getFileStatus(hamaDstPath);
+      URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaDstPath
+          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+      LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
+      hamaReleaseRsrc.setResource(hamaReleaseUrl);
+      hamaReleaseRsrc.setSize(hamaStatus.getLen());
+      hamaReleaseRsrc.setTimestamp(hamaStatus.getModificationTime());
+      hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
+      hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+
+      localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+
+      // Set the local resources into the launch context
+      amContainer.setLocalResources(localResources);
+
+      // Set up the environment needed for the launch context
+      Map<String, String> env = new HashMap<String, String>();
+      // Assuming our classes or jars are available as local resources in the
+      // working directory from which the command will be run, we need to 
append
+      // "." to the path.
+      // By default, all the hadoop specific classpaths will already be 
available
+      // in $CLASSPATH, so we should be careful not to overwrite it.
+      StringBuilder classPathEnv = new StringBuilder(
+          
ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+          .append("./*");
+      for (String c : yarnConf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        classPathEnv.append(File.pathSeparatorChar);
+        classPathEnv.append(c.trim());
+      }
+      classPathEnv.append(File.pathSeparator);
+      classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK + 
"/hama-0.6.4/*");
+
+      env.put(YARNBSPConstants.HAMA_YARN_LOCATION, jarPath.toUri().toString());
+      env.put(YARNBSPConstants.HAMA_YARN_SIZE, 
Long.toString(jarStatus.getLen()));
+      env.put(YARNBSPConstants.HAMA_YARN_TIMESTAMP, 
Long.toString(jarStatus.getModificationTime()));
+
+      env.put(YARNBSPConstants.HAMA_RELEASE_LOCATION, 
hamaDstPath.toUri().toString());
+      env.put(YARNBSPConstants.HAMA_RELEASE_SIZE, 
Long.toString(hamaStatus.getLen()));
+      env.put(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP, 
Long.toString(hamaStatus.getModificationTime()));
+      env.put("CLASSPATH", classPathEnv.toString());
+      amContainer.setEnvironment(env);
+
+      // Set the necessary command to execute on the allocated container
+      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+      vargs.add("${JAVA_HOME}/bin/java");
+      vargs.add("-cp " + classPathEnv + "");
+      vargs.add(BSPApplicationMaster.class.getCanonicalName());
+      vargs.add(submitJobFile.makeQualified(fs.getUri(), 
fs.getWorkingDirectory()).toString());
+
+      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/hama-appmaster.stdout");
+      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/hama-appmaster.stderr");
+
+      // Get final commmand
+      StringBuilder command = new StringBuilder();
+      for (CharSequence str : vargs) {
+        command.append(str).append(" ");
+      }
+
+      List<String> commands = new ArrayList<String>();
+      commands.add(command.toString());
+      amContainer.setCommands(commands);
+
+      LOG.debug("Start command: " + command);
+
+      Resource capability = Records.newRecord(Resource.class);
+      // we have at least 3 threads, which comsumes 1mb each, for each bsptask 
and
+      // a base usage of 100mb
+      capability.setMemory(3 * job.getNumBspTask() + 
getConf().getInt("hama.appmaster.memory.mb", 100));
+      LOG.info("Set memory for the application master to " + 
capability.getMemory() + "mb!");
+
+      // Set the container launch content into the ApplicationSubmissionContext
+      appContext.setResource(capability);
+
+      // Setup security tokens
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // Note: Credentials class is marked as LimitedPrivate for HDFS and 
MapReduce
+        Credentials credentials = new Credentials();
+        String tokenRenewer = yarnConf.get(YarnConfiguration.RM_PRINCIPAL);
+        if (tokenRenewer == null || tokenRenewer.length() == 0) {
+          throw new IOException(
+              "Can't get Master Kerberos principal for the RM to use as 
renewer");
+        }
+
+        // For now, only getting tokens for the default file-system.
+        final Token<?> tokens[] =
+            fs.addDelegationTokens(tokenRenewer, credentials);
+        if (tokens != null) {
+          for (Token<?> token : tokens) {
+            LOG.info("Got dt for " + fs.getUri() + "; " + token);
+          }
+        }
+        DataOutputBuffer dob = new DataOutputBuffer();
+        credentials.writeTokenStorageToStream(dob);
+        ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, 
dob.getLength());
+        amContainer.setTokens(fsTokens);
+      }
+
+      appContext.setAMContainerSpec(amContainer);
+
+      // Create the request to send to the ApplicationsManager
+      ApplicationId appId = appContext.getApplicationId();
+      yarnClient.submitApplication(appContext);
+
+      return monitorApplication(appId) ? new NetworkedJob() : null;
+    } catch (YarnException e) {
+      e.printStackTrace();
+      return null;
+    }
   }
 
   @Override
@@ -207,4 +364,76 @@ public class YARNBSPJobClient extends BS
     return report;
   }
 
+  private boolean monitorApplication(ApplicationId appId)
+      throws IOException, YarnException {
+    while (true) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.debug("Thread sleep in monitoring loop interrupted");
+      }
+
+      // Get application report for the appId we are interested in
+      report = yarnClient.getApplicationReport(appId);
+
+      LOG.info("Got application report from ASM for" + ", appId="
+          + appId.getId() + ", clientToAMToken="
+          + report.getClientToAMToken() + ", appDiagnostics="
+          + report.getDiagnostics() + ", appMasterHost="
+          + report.getHost() + ", appQueue=" + report.getQueue()
+          + ", appMasterRpcPort=" + report.getRpcPort()
+          + ", appStartTime=" + report.getStartTime()
+          + ", yarnAppState="
+          + report.getYarnApplicationState().toString()
+          + ", distributedFinalState="
+          + report.getFinalApplicationStatus().toString()
+          + ", appTrackingUrl=" + report.getTrackingUrl()
+          + ", appUser=" + report.getUser());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      FinalApplicationStatus dsStatus = report
+          .getFinalApplicationStatus();
+      if (YarnApplicationState.FINISHED == state) {
+        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+          LOG.info("Application has completed successfully. Breaking 
monitoring loop");
+          return true;
+        } else {
+          LOG.info("Application did finished unsuccessfully."
+              + " YarnState=" + state.toString()
+              + ", DSFinalStatus=" + dsStatus.toString()
+              + ". Breaking monitoring loop");
+          return false;
+        }
+      } else if (YarnApplicationState.KILLED == state
+          || YarnApplicationState.FAILED == state) {
+        LOG.info("Application did not finish." + " YarnState="
+            + state.toString() + ", DSFinalStatus="
+            + dsStatus.toString() + ". Breaking monitoring loop");
+        return false;
+      }
+
+      if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
+        LOG.info("Reached client specified timeout for application. Killing 
application");
+        forceKillApplication(appId);
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Kill a submitted application by sending a call to the ASM
+   * @param appId Application Id to be killed.
+   * @throws YarnException
+   * @throws IOException
+   */
+  private void forceKillApplication(ApplicationId appId)
+      throws YarnException, IOException {
+    // TODO clarify whether multiple jobs with the same app id can be 
submitted and be running at
+    // the same time.
+    // If yes, can we kill a particular attempt only?
+
+    // Response can be ignored as it is non-null on success or
+    // throws an exception in case of failures
+    yarnClient.killApplication(appId);
+  }
 }

Modified: 
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1663907&r1=1663906&r2=1663907&view=diff
==============================================================================
--- 
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java 
(original)
+++ 
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java 
Wed Mar  4 09:28:46 2015
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.sync.SyncException;
 
@@ -37,7 +38,7 @@ public class YarnSerializePrinting {
     public void bsp(
         BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, 
NullWritable> bspPeer)
         throws IOException, SyncException, InterruptedException {
-      num = bspPeer.getConfiguration().getInt("bsp.peers.num", 0);
+      num = bspPeer.getConfiguration().getInt("bsp.peers.num", 1);
       LOG.info(bspPeer.getAllPeerNames());
       int i = 0;
       for (String otherPeer : bspPeer.getAllPeerNames()) {
@@ -57,15 +58,19 @@ public class YarnSerializePrinting {
       InterruptedException, ClassNotFoundException {
     HamaConfiguration conf = new HamaConfiguration();
     // TODO some keys that should be within a conf
-    conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
-    conf.set("bsp.local.dir", "/tmp/bsp-yarn/");
+    conf.set("bsp.user.name", "hama");
+    conf.setInt(Constants.MAX_TASKS, 10);
 
     YARNBSPJob job = new YARNBSPJob(conf);
+    job.setBoolean("hama.yarn.application", true);
     job.setBspClass(HelloBSP.class);
     job.setJarByClass(HelloBSP.class);
     job.setJobName("Serialize Printing");
-    job.setMemoryUsedPerTaskInMb(50);
-    job.setNumBspTask(2);
+    job.setInputFormat(NullInputFormat.class);
+    job.setOutputFormat(NullOutputFormat.class);
+
+    job.setMemoryUsedPerTaskInMb(100);
+    job.setNumBspTask(4);
     job.waitForCompletion(true);
   }
 }


Reply via email to