Author: tjungblut Date: Wed Oct 12 08:51:32 2011 New Revision: 1182265 URL: http://svn.apache.org/viewvc?rev=1182265&view=rev Log: several runtime fixes
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1182265&r1=1182264&r2=1182265&view=diff ============================================================================== --- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original) +++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Wed Oct 12 08:51:32 2011 @@ -28,6 +28,8 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.ipc.ProtocolSignature; @@ -117,6 +119,12 @@ public class BSPApplicationMaster implem this.clientServer = RPC.getServer(this, hostname, clientPort, 10, false, jobConf); + /* + * Make sure that this executes after the start of the sync server, because + * we are readjusting the configuration. + */ + rewriteSubmitConfiguration(jobFile, jobConf); + amrmRPC = getYarnRPCConnection(localConf); registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, null); } @@ -139,7 +147,7 @@ public class BSPApplicationMaster implem LOG.info("Waiting for the Sync Master at " + syncAddress); RPC.waitForProxy(SyncServer.class, SyncServer.versionID, syncAddress, jobConf); - jobConf.set("bsp.sync.server.address", hostname + ":" + syncPort); + jobConf.set("hama.sync.server.address", hostname + ":" + syncPort); } /** @@ -237,6 +245,8 @@ public class BSPApplicationMaster implem case FAILED: finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); break; + default: + finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); } this.amrmRPC.finishApplicationMaster(finishReq); } @@ -250,7 +260,6 @@ public class BSPApplicationMaster implem master.start(); } catch (Exception e) { LOG.fatal("Error starting BSPApplicationMaster", e); - System.exit(1); } finally { if (master != null) { master.cleanup(); @@ -273,6 +282,23 @@ public class BSPApplicationMaster implem } /** + * Writes the current configuration to a given path to reflect changes. For + * example the sync server address is put after the file has been written. + * TODO this should upload to HDFS to a given path as well. + * + * @throws IOException + */ + private void rewriteSubmitConfiguration(String path, Configuration conf) + throws IOException { + Path jobSubmitPath = new Path(path); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream out = fs.create(jobSubmitPath); + conf.writeXml(out); + out.close(); + LOG.info("Written new configuration back to " + path); + } + + /** * Uses Minas AvailablePortFinder to find a port, starting at 14000. * * @return a free port. Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1182265&r1=1182264&r2=1182265&view=diff ============================================================================== --- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original) +++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Wed Oct 12 08:51:32 2011 @@ -87,7 +87,8 @@ public class BSPTaskLauncher implements LOG.info("Spawned task with id: " + this.id + " for allocated container id: " + this.allocatedContainer.getId().toString()); - final GetContainerStatusRequest statusRequest = setupContainer(allocatedContainer, cm, user, id); + final GetContainerStatusRequest statusRequest = setupContainer( + allocatedContainer, cm, user, id); ContainerStatus lastStatus; while ((lastStatus = cm.getContainerStatus(statusRequest).getStatus()) @@ -111,11 +112,13 @@ public class BSPTaskLauncher implements ctx.setResource(allocatedContainer.getResource()); ctx.setUser(user); + /* + * jar + */ LocalResource packageResource = Records.newRecord(LocalResource.class); FileSystem fs = FileSystem.get(conf); Path packageFile = new Path(conf.get("bsp.jar")); - URL packageUrl = ConverterUtils.getYarnUrlFromPath(new Path(conf - .get("bsp.jar"))); + URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile); FileStatus fileStatus = fs.getFileStatus(packageFile); packageResource.setResource(packageUrl); @@ -123,14 +126,33 @@ public class BSPTaskLauncher implements packageResource.setTimestamp(fileStatus.getModificationTime()); packageResource.setType(LocalResourceType.ARCHIVE); packageResource.setVisibility(LocalResourceVisibility.APPLICATION); + LOG.info("Package resource: " + packageResource.getResource()); - ctx.setCommands(Arrays.asList("${JAVA_HOME}" - + "/bin/java -cp './package/*' ", BSPTaskLauncher.class - .getCanonicalName(), jobId.getJtIdentifier(), id + "", this.jobFile - .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(), " 1>" - + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>" - + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")); ctx.setLocalResources(Collections.singletonMap("package", packageResource)); + + /* + * TODO Package classpath seems not to work if you're in pseudo distributed + * mode, because the resource must not be moved, it will never be unpacked. + * So we will check if our jar file has the file:// prefix and put it into + * the CP directly + */ + String cp = "$CLASSPATH:./*:./package/*:./*:"; + if (packageUrl.getScheme() != null && packageUrl.getScheme().equals("file")) { + cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) + .toString() + ":"; + LOG.info("Localized file scheme detected, adjusting CP to: " + cp); + } + String[] cmds = { + "${JAVA_HOME}" + "/bin/java -cp '" + cp + "' " + + BSPRunner.class.getCanonicalName(), + jobId.getJtIdentifier(), + id + "", + this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) + .toString(), + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" }; + ctx.setCommands(Arrays.asList(cmds)); + LOG.info("Starting command: " + Arrays.toString(cmds)); StartContainerRequest startReq = Records .newRecord(StartContainerRequest.class);