Author: tjungblut
Date: Tue Oct 11 15:54:40 2011
New Revision: 1181854

URL: http://svn.apache.org/viewvc?rev=1181854&view=rev
Log:
several fixes

Modified:
    
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
    
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
    
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java

Modified: 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1181854&r1=1181853&r2=1181854&view=diff
==============================================================================
--- 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
 (original)
+++ 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
 Tue Oct 11 15:54:40 2011
@@ -206,6 +206,9 @@ public class BSPApplicationMaster implem
       job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, 
jobId);
       finalState = job.startJob();
     } finally {
+      if (this.syncServer != null) {
+        this.syncServer.stopServer();
+      }
       if (finalState != null) {
         LOG.info("Job \"" + applicationName + "\"'s state after completion: "
             + finalState.toString());

Modified: 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1181854&r1=1181853&r2=1181854&view=diff
==============================================================================
--- 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
 (original)
+++ 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
 Tue Oct 11 15:54:40 2011
@@ -149,16 +149,20 @@ public class JobImpl implements Job {
 
     AllocateResponse allocateResponse = resourceManager.allocate(req);
     AMResponse amResponse = allocateResponse.getAMResponse();
-    if (amResponse.getResponseId() == 0) {
+    LOG.info("Got response! ID: " + amResponse.getResponseId()
+        + " with num of containers: "
+        + amResponse.getAllocatedContainers().size());
+    // somehow the response id is always incremented
+    if (amResponse.getResponseId() == 1) {
       this.allocatedContainers = amResponse.getAllocatedContainers();
     } else {
       LOG.error("Response IDs somehow did not match. Got: "
-          + amResponse.getResponseId() + " where it should be 0 (zero).");
+          + amResponse.getResponseId() + " where it should be 1 (one).");
       state = JobState.FAILED;
       return state;
     }
 
-    int launchedBSPTasks = numBSPTasks;
+    int launchedBSPTasks = 0;
 
     int id = 0;
     for (Container allocatedContainer : allocatedContainers) {
@@ -175,6 +179,7 @@ public class JobImpl implements Job {
       launchers.put(id, runnableLaunchContainer);
       completionService.submit(runnableLaunchContainer);
       id++;
+      launchedBSPTasks++;
     }
     state = JobState.RUNNING;
 

Modified: 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1181854&r1=1181853&r2=1181854&view=diff
==============================================================================
--- 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
 (original)
+++ 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
 Tue Oct 11 15:54:40 2011
@@ -212,9 +212,12 @@ public class YARNBSPJob extends BSPJob {
     GetApplicationReportRequest reportRequest = Records
         .newRecord(GetApplicationReportRequest.class);
     reportRequest.setApplicationId(id);
-    GetApplicationReportResponse reportResponse = applicationsManager
-        .getApplicationReport(reportRequest);
-    report = reportResponse.getApplicationReport();
+    while (report == null || report.getHost().equals("N/A")) {
+      GetApplicationReportResponse reportResponse = applicationsManager
+          .getApplicationReport(reportRequest);
+      report = reportResponse.getApplicationReport();
+      Thread.sleep(1000L);
+    }
     LOG.info("Got report: " + report.getApplicationId() + " "
         + report.getHost());
     submitted = true;

Modified: 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1181854&r1=1181853&r2=1181854&view=diff
==============================================================================
--- 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
 (original)
+++ 
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
 Tue Oct 11 15:54:40 2011
@@ -60,20 +60,21 @@ public class YarnSerializePrinting {
     public void setConf(Configuration conf) {
       this.conf = conf;
     }
+  }
 
-    public static void main(String[] args) throws IOException, 
InterruptedException, ClassNotFoundException {
-      HamaConfiguration conf = new HamaConfiguration();
-      // TODO some keys that should be within a conf 
-      conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
-      conf.set("bsp.local.dir","/tmp/bsp-yarn/");
-      YARNBSPJob job = new YARNBSPJob(conf);
-      job.setBspClass(HelloBSP.class);
-      job.setJarByClass(HelloBSP.class);
-      job.setJobName("Serialize Printing");
-      job.setMemoryUsedPerTaskInMb(50);
-      job.setNumBspTask(3);
-      job.waitForCompletion(true);
-    }
-
+  public static void main(String[] args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    HamaConfiguration conf = new HamaConfiguration();
+    // TODO some keys that should be within a conf
+    conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
+    conf.set("bsp.local.dir", "/tmp/bsp-yarn/");
+    YARNBSPJob job = new YARNBSPJob(conf);
+    job.setBspClass(HelloBSP.class);
+    job.setJarByClass(HelloBSP.class);
+    job.setJobName("Serialize Printing");
+    job.setMemoryUsedPerTaskInMb(512);
+    job.setNumBspTask(3);
+    // TODO true throws exceptions
+    job.waitForCompletion(false);
   }
 }


Reply via email to