Author: tjungblut Date: Tue Oct 11 15:54:40 2011 New Revision: 1181854 URL: http://svn.apache.org/viewvc?rev=1181854&view=rev Log: several fixes
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1181854&r1=1181853&r2=1181854&view=diff ============================================================================== --- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original) +++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Tue Oct 11 15:54:40 2011 @@ -206,6 +206,9 @@ public class BSPApplicationMaster implem job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId); finalState = job.startJob(); } finally { + if (this.syncServer != null) { + this.syncServer.stopServer(); + } if (finalState != null) { LOG.info("Job \"" + applicationName + "\"'s state after completion: " + finalState.toString()); Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1181854&r1=1181853&r2=1181854&view=diff ============================================================================== --- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java (original) +++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java Tue Oct 11 15:54:40 2011 @@ -149,16 +149,20 @@ public class JobImpl implements Job { AllocateResponse allocateResponse = resourceManager.allocate(req); AMResponse amResponse = allocateResponse.getAMResponse(); - if (amResponse.getResponseId() == 0) { + LOG.info("Got response! ID: " + amResponse.getResponseId() + + " with num of containers: " + + amResponse.getAllocatedContainers().size()); + // somehow the response id is always incremented + if (amResponse.getResponseId() == 1) { this.allocatedContainers = amResponse.getAllocatedContainers(); } else { LOG.error("Response IDs somehow did not match. Got: " - + amResponse.getResponseId() + " where it should be 0 (zero)."); + + amResponse.getResponseId() + " where it should be 1 (one)."); state = JobState.FAILED; return state; } - int launchedBSPTasks = numBSPTasks; + int launchedBSPTasks = 0; int id = 0; for (Container allocatedContainer : allocatedContainers) { @@ -175,6 +179,7 @@ public class JobImpl implements Job { launchers.put(id, runnableLaunchContainer); completionService.submit(runnableLaunchContainer); id++; + launchedBSPTasks++; } state = JobState.RUNNING; Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1181854&r1=1181853&r2=1181854&view=diff ============================================================================== --- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (original) +++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Tue Oct 11 15:54:40 2011 @@ -212,9 +212,12 @@ public class YARNBSPJob extends BSPJob { GetApplicationReportRequest reportRequest = Records .newRecord(GetApplicationReportRequest.class); reportRequest.setApplicationId(id); - GetApplicationReportResponse reportResponse = applicationsManager - .getApplicationReport(reportRequest); - report = reportResponse.getApplicationReport(); + while (report == null || report.getHost().equals("N/A")) { + GetApplicationReportResponse reportResponse = applicationsManager + .getApplicationReport(reportRequest); + report = reportResponse.getApplicationReport(); + Thread.sleep(1000L); + } LOG.info("Got report: " + report.getApplicationId() + " " + report.getHost()); submitted = true; Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1181854&r1=1181853&r2=1181854&view=diff ============================================================================== --- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (original) +++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java Tue Oct 11 15:54:40 2011 @@ -60,20 +60,21 @@ public class YarnSerializePrinting { public void setConf(Configuration conf) { this.conf = conf; } + } - public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { - HamaConfiguration conf = new HamaConfiguration(); - // TODO some keys that should be within a conf - conf.set("yarn.resourcemanager.address", "0.0.0.0:8040"); - conf.set("bsp.local.dir","/tmp/bsp-yarn/"); - YARNBSPJob job = new YARNBSPJob(conf); - job.setBspClass(HelloBSP.class); - job.setJarByClass(HelloBSP.class); - job.setJobName("Serialize Printing"); - job.setMemoryUsedPerTaskInMb(50); - job.setNumBspTask(3); - job.waitForCompletion(true); - } - + public static void main(String[] args) throws IOException, + InterruptedException, ClassNotFoundException { + HamaConfiguration conf = new HamaConfiguration(); + // TODO some keys that should be within a conf + conf.set("yarn.resourcemanager.address", "0.0.0.0:8040"); + conf.set("bsp.local.dir", "/tmp/bsp-yarn/"); + YARNBSPJob job = new YARNBSPJob(conf); + job.setBspClass(HelloBSP.class); + job.setJarByClass(HelloBSP.class); + job.setJobName("Serialize Printing"); + job.setMemoryUsedPerTaskInMb(512); + job.setNumBspTask(3); + // TODO true throws exceptions + job.waitForCompletion(false); } }