Author: edwardyoon
Date: Mon Mar 23 03:46:03 2015
New Revision: 1668514
URL: http://svn.apache.org/r1668514
Log:
HAMA-931: Make the HAMA base path configurable
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/pom.xml
hama/trunk/yarn/pom.xml
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Mar 23 03:46:03 2015
@@ -11,6 +11,7 @@ Release 0.7.0 (unreleased changes)
BUG FIXES
+ HAMA-931: Make the HAMA base path configurable (Minho Kim via edwardyoon)
HAMA-930: Add hama-yarn to binary distribution (Minho Kim via edwardyoon)
HAMA-848: Refactor YARN module for hadoop 2.x stable version (Minho Kim via
edwardyoon)
HAMA-906: Automatic activation of halted vertices without received messages
(edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Mar
23 03:46:03 2015
@@ -303,21 +303,16 @@ public class BSPJobClient extends Config
BSPJob job = pJob;
job.setJobID(jobId);
- int maxTasks;
- if (job.getConfiguration().getBoolean("hama.yarn.application", false)) {
- int maxMem =
job.getConfiguration().getInt("yarn.nodemanager.resource.memory-mb", 0);
- int minAllocationMem =
job.getConfiguration().getInt("yarn.scheduler.minimum-allocation-mb", 1024);
- maxTasks = maxMem / minAllocationMem;
- } else {
- ClusterStatus clusterStatus = getClusterStatus(true);
- maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
- clusterStatus.getMaxTasks() - clusterStatus.getTasks());
-
- if (maxTasks < job.getNumBspTask()) {
- LOG.warn("The configured number of tasks has exceeded the maximum
allowed. Job will run with "
- + maxTasks + " tasks.");
- job.setNumBspTask(maxTasks);
- }
+ int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+ job.getNumBspTask());
+
+ ClusterStatus clusterStatus = getClusterStatus(true);
+ // Re-adjust the maxTasks based on cluster status.
+ if (clusterStatus != null
+ && maxTasks > (clusterStatus.getMaxTasks() -
clusterStatus.getTasks())) {
+ LOG.warn("The configured number of tasks has exceeded the maximum
allowed. Job will run with "
+ + (clusterStatus.getMaxTasks() - clusterStatus.getTasks()) + "
tasks.");
+ job.setNumBspTask(clusterStatus.getMaxTasks() -
clusterStatus.getTasks());
}
Path submitJobDir = new Path(getSystemDir(), "submit_"
@@ -794,7 +789,8 @@ public class BSPJobClient extends Config
* @throws IOException
*/
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
- return jobSubmitClient.getClusterStatus(detailed);
+ return (jobSubmitClient != null) ? jobSubmitClient
+ .getClusterStatus(detailed) : null;
}
// for the testcase
Modified: hama/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Mar 23 03:46:03 2015
@@ -212,6 +212,11 @@
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
</dependencies>
</profile>
</profiles>
Modified: hama/trunk/yarn/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/pom.xml?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/pom.xml (original)
+++ hama/trunk/yarn/pom.xml Mon Mar 23 03:46:03 2015
@@ -70,7 +70,6 @@
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
-
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@@ -80,11 +79,6 @@
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
</dependencies>
<build>
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
Mon Mar 23 03:46:03 2015
@@ -59,7 +59,7 @@ import org.apache.hadoop.yarn.util.Recor
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Job.JobState;
-import org.apache.hama.bsp.sync.SyncServerRunner;
+import org.apache.hama.bsp.sync.SyncServer;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.RPC;
@@ -101,7 +101,8 @@ public class BSPApplicationMaster implem
private Server taskServer;
private volatile long superstep;
- private SyncServerRunner syncServer;
+ //private SyncServerRunner syncServer;
+ private SyncServer syncServer;
private Counters globalCounter = new Counters();
@@ -114,8 +115,11 @@ public class BSPApplicationMaster implem
}
this.jobFile = args[0];
- this.localConf = new YarnConfiguration();
+
this.jobConf = getSubmitConfiguration(jobFile);
+
+ this.localConf = new YarnConfiguration();
+ localConf.addResource(localConf);
fs = FileSystem.get(jobConf);
this.applicationName = jobConf.get("bsp.job.name",
@@ -192,9 +196,28 @@ public class BSPApplicationMaster implem
* @throws IOException
*/
private void startSyncServer() throws Exception {
- syncServer = SyncServiceFactory.getSyncServerRunner(jobConf);
- jobConf = syncServer.init(jobConf);
- threadPool.submit(syncServer);
+ syncServer = SyncServiceFactory.getSyncServer(jobConf);
+ syncServer.init(jobConf);
+
+ ZKServerThread serverThread = new ZKServerThread(syncServer);
+ threadPool.submit(serverThread);
+ }
+
+ private static class ZKServerThread implements Runnable {
+ SyncServer server;
+
+ ZKServerThread(SyncServer s) {
+ server = s;
+ }
+
+ @Override
+ public void run() {
+ try {
+ server.start();
+ } catch (Exception e) {
+ LOG.error("Error running SyncServer.", e);
+ }
+ }
}
/**
@@ -282,11 +305,14 @@ public class BSPApplicationMaster implem
.getApplicationAttemptId();
}
- private void start() throws Exception {
+ private void start() throws IOException, YarnException /*throws Exception*/ {
JobState finalState = null;
try {
job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile,
jobId);
finalState = job.startJob();
+ } catch (Exception e) {
+ LOG.error("error was occured. cleaning up");
+ e.printStackTrace();
} finally {
if (finalState != null) {
LOG.info("Job \"" + applicationName + "\"'s state after completion: "
@@ -294,12 +320,14 @@ public class BSPApplicationMaster implem
LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L)
+ "s to finish!");
}
+ LOG.info("job is cleaning up");
job.cleanup();
}
}
private void cleanup() throws YarnException, IOException {
- syncServer.stop();
+ //syncServer.stop();
+ syncServer.stopServer();
if (threadPool != null && !threadPool.isShutdown()) {
threadPool.shutdownNow();
Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Mon
Mar 23 03:46:03 2015
@@ -26,7 +26,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.*;
@@ -101,11 +103,15 @@ public class BSPTaskLauncher {
ContainerStatus lastStatus = null;
GetContainerStatusesResponse getContainerStatusesResponse =
cm.getContainerStatuses(statusRequest);
List<ContainerStatus> containerStatuses =
getContainerStatusesResponse.getContainerStatuses();
+ if (containerStatuses.size() <= 0) {
+ LOG.info("container Statuses size is zero");
+ return null;
+ }
+
for (ContainerStatus containerStatus : containerStatuses) {
- LOG.info("Got container status for containerID="
- + containerStatus.getContainerId() + ", state="
- + containerStatus.getState() + ", exitStatus="
- + containerStatus.getExitStatus() + ", diagnostics="
+ LOG.info("Got container status for containerID=" + containerStatus
+ .getContainerId() + ", state=" + containerStatus.getState()
+ + ", exitStatus=" + containerStatus.getExitStatus() + ",
diagnostics="
+ containerStatus.getDiagnostics());
if (containerStatus.getContainerId().equals(allocatedContainer.getId()))
{
@@ -113,12 +119,14 @@ public class BSPTaskLauncher {
break;
}
}
+
if (lastStatus.getState() != ContainerState.COMPLETE) {
+ LOG.info("Not completed...");
return null;
}
- LOG.info(this.id + " Last report comes with exitstatus of "
- + lastStatus.getExitStatus() + " and diagnose string of "
- + lastStatus.getDiagnostics());
+ LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus
+ .getExitStatus() + " and diagnose string of " + lastStatus
+ .getDiagnostics());
return new BSPTaskStatus(id, lastStatus.getExitStatus());
}
@@ -154,19 +162,22 @@ public class BSPTaskLauncher {
localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
- Path hamaReleaseFile = new
Path(System.getenv(YARNBSPConstants.HAMA_RELEASE_LOCATION));
+ Path hamaReleaseFile = new
Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
LOG.info("Hama release URL has been composed to " +
hamaReleaseUrl.toString());
- LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
- hamaReleaseRsrc.setResource(hamaReleaseUrl);
-
hamaReleaseRsrc.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_SIZE)));
-
hamaReleaseRsrc.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP)));
- hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
- hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
+ hamaReleaseFile, true);
- localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+ while(fileStatusListIterator.hasNext()) {
+ LocatedFileStatus lfs = fileStatusListIterator.next();
+ LocalResource localRsrc = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ lfs.getLen(), lfs.getModificationTime());
+ localResources.put(lfs.getPath().getName(), localRsrc);
+ }
ctx.setLocalResources(localResources);
@@ -187,13 +198,6 @@ public class BSPTaskLauncher {
classPathEnv.append(c.trim());
}
- classPathEnv.append(File.pathSeparator);
- classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
- "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/*");
- classPathEnv.append(File.pathSeparator);
- classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
- "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/lib/*");
-
Vector<CharSequence> vargs = new Vector<CharSequence>();
vargs.add("${JAVA_HOME}/bin/java");
vargs.add("-cp " + classPathEnv + "");
Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java Mon Mar 23
03:46:03 2015
@@ -196,34 +196,27 @@ public class JobImpl implements Job {
state = JobState.RUNNING;
int completed = 0;
- List<Integer> cleanupTasks = new ArrayList<Integer>();
while (completed != numBSPTasks) {
for (BSPTaskLauncher task : completionQueue) {
BSPTaskStatus returnedTask = task.poll();
- // if our task returned with a finished state
- if (returnedTask != null) {
- if (returnedTask.getExitStatus() != 0) {
- LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
- cleanupTask(returnedTask.getId());
- state = JobState.FAILED;
- return state;
- } else {
- LOG.info("Task \"" + returnedTask.getId()
- + "\" sucessfully finished!");
- completed++;
- LOG.info("Waiting for " + (numBSPTasks - completed)
- + " tasks to finish!");
- }
- cleanupTasks.add(returnedTask.getId());
+ if(returnedTask != null && returnedTask.getExitStatus() == 0) {
+ LOG.info("Task \"" + returnedTask.getId()
+ + "\" sucessfully finished!");
+ completed++;
+ LOG.info("Waiting for " + (numBSPTasks - completed)
+ + " tasks to finish!");
+ }
+
+ if(returnedTask != null && returnedTask.getExitStatus() != 0) {
+ LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
+ completionQueue.add(task);
+ state = JobState.FAILED;
+ return state;
}
}
Thread.sleep(1000L);
}
- for (Integer stopId : cleanupTasks) {
- cleanupTask(stopId);
- }
-
state = JobState.SUCCESS;
return state;
}
@@ -308,6 +301,7 @@ public class JobImpl implements Job {
@Override
public void cleanup() throws YarnException, IOException {
for (BSPTaskLauncher launcher : completionQueue) {
+ LOG.info("cleanup tasks !!!");
launcher.stopAndCleanup();
}
}
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java Mon
Mar 23 03:46:03 2015
@@ -42,7 +42,7 @@ public class YARNBSPConstants {
/**
* Environment key name pointing to the hama release's location
*/
- public static final String HAMA_RELEASE_LOCATION = "HAMARELEASELOCATION";
+ public static final String HAMA_LOCATION = "HAMALOCATION";
/**
* Environment key name denoting the file content length for the hama
release.
@@ -61,23 +61,4 @@ public class YARNBSPConstants {
*/
public static final String APP_MASTER_JAR_PATH = "AppMaster.jar";
- /**
- * Symbolic link name for hama release archive in container local resource
- */
- public static final String HAMA_SYMLINK = "hama";
-
- /**
- * Hama release file name
- */
- public static final String HAMA_RELEASE_FILE = "hama-0.6.4.tar.gz";
-
- /**
- * Hama release version
- */
- public static final String HAMA_RELEASE_VERSION = "hama-0.6.4";
-
- /**
- * Hama release file source location
- */
- public static final String HAMA_SRC_PATH = "/home/hadoop";
}
Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Mon Mar
23 03:46:03 2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
public class YARNBSPJob extends BSPJob {
@@ -71,6 +72,10 @@ public class YARNBSPJob extends BSPJob {
@Override
public void submit() throws IOException, InterruptedException {
+ // If Constants.MAX_TASKS_PER_JOB is null, calculates the max tasks based
on resource status.
+ this.getConfiguration().setInt(Constants.MAX_TASKS_PER_JOB, getMaxTasks());
+ LOG.debug("MaxTasks: " +
this.getConfiguration().get(Constants.MAX_TASKS_PER_JOB));
+
RunningJob submitJobInternal = submitClient.submitJobInternal(this,
new BSPJobID("hama_yarn", id++));
@@ -80,6 +85,14 @@ public class YARNBSPJob extends BSPJob {
}
}
+ private int getMaxTasks() {
+ int maxMem = this.getConfiguration().getInt(
+ "yarn.nodemanager.resource.memory-mb", 0);
+ int minAllocationMem = this.getConfiguration().getInt(
+ "yarn.scheduler.minimum-allocation-mb", 1024);
+ return maxMem / minAllocationMem;
+ }
+
@Override
public boolean waitForCompletion(boolean verbose) throws IOException,
InterruptedException, ClassNotFoundException {
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Mon
Mar 23 03:46:03 2015
@@ -17,23 +17,18 @@
*/
package org.apache.hama.bsp;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.commons.beanutils.Converter;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -44,10 +39,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
public class YARNBSPJobClient extends BSPJobClient {
@@ -233,23 +226,13 @@ public class YARNBSPJobClient extends BS
// this creates a symlink in the working directory
localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc);
- // Copy from hama-${version}.tar.gz to HDFS
- Path hamaDstPath = new Path(getSystemDir(),
YARNBSPConstants.HAMA_RELEASE_FILE);
- hamaDstPath = fs.makeQualified(hamaDstPath);
- fs.copyFromLocalFile(false, true,
- new Path(YARNBSPConstants.HAMA_SRC_PATH,
YARNBSPConstants.HAMA_RELEASE_FILE),
- hamaDstPath);
- FileStatus hamaStatus = fs.getFileStatus(hamaDstPath);
- URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaDstPath
- .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
- LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
- hamaReleaseRsrc.setResource(hamaReleaseUrl);
- hamaReleaseRsrc.setSize(hamaStatus.getLen());
- hamaReleaseRsrc.setTimestamp(hamaStatus.getModificationTime());
- hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
- hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-
- localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+ // add hama related jar files to localresources for container
+ List<File> hamaJars =
localJarfromPath(System.getProperty("hama.home.dir"));
+ String hamaPath = getSystemDir() + "/hama";
+ for (File fileEntry : hamaJars) {
+ addToLocalResources(fs, fileEntry.getCanonicalPath(),
+ hamaPath, fileEntry.getName(), localResources);
+ }
// Set the local resources into the launch context
amContainer.setLocalResources(localResources);
@@ -270,16 +253,12 @@ public class YARNBSPJobClient extends BS
classPathEnv.append(File.pathSeparatorChar);
classPathEnv.append(c.trim());
}
- classPathEnv.append(File.pathSeparator);
- classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
"/hama-0.6.4/*");
env.put(YARNBSPConstants.HAMA_YARN_LOCATION, jarPath.toUri().toString());
env.put(YARNBSPConstants.HAMA_YARN_SIZE,
Long.toString(jarStatus.getLen()));
env.put(YARNBSPConstants.HAMA_YARN_TIMESTAMP,
Long.toString(jarStatus.getModificationTime()));
- env.put(YARNBSPConstants.HAMA_RELEASE_LOCATION,
hamaDstPath.toUri().toString());
- env.put(YARNBSPConstants.HAMA_RELEASE_SIZE,
Long.toString(hamaStatus.getLen()));
- env.put(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP,
Long.toString(hamaStatus.getModificationTime()));
+ env.put(YARNBSPConstants.HAMA_LOCATION, hamaPath);
env.put("CLASSPATH", classPathEnv.toString());
amContainer.setEnvironment(env);
@@ -436,4 +415,28 @@ public class YARNBSPJobClient extends BS
// throws an exception in case of failures
yarnClient.killApplication(appId);
}
+
+ private List<File> localJarfromPath(String path) throws IOException {
+ File hamaHome = new File(path);
+ String[] extensions = new String[]{"jar"};
+ List<File> files = (List<File>)FileUtils.listFiles(hamaHome, extensions,
true);
+
+ return files;
+ }
+
+
+ private void addToLocalResources(FileSystem fs, String fileSrcPath,
+ String fileDstPath, String fileName, Map<String, LocalResource>
localResources)
+ throws IOException {
+ Path dstPath = new Path(fileDstPath, fileName);
+ dstPath = fs.makeQualified(dstPath);
+ fs.copyFromLocalFile(false, true, new Path(fileSrcPath), dstPath);
+ FileStatus fileStatus = fs.getFileStatus(dstPath);
+ LocalResource localRsrc =
+ LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromURI(dstPath.toUri()),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ fileStatus.getLen(), fileStatus.getModificationTime());
+ localResources.put(fileName, localRsrc);
+ }
}
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
---
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
(original)
+++
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
Mon Mar 23 03:46:03 2015
@@ -57,12 +57,9 @@ public class YarnSerializePrinting {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
HamaConfiguration conf = new HamaConfiguration();
- // TODO some keys that should be within a conf
- conf.set("bsp.user.name", "hama");
- conf.setInt(Constants.MAX_TASKS, 10);
YARNBSPJob job = new YARNBSPJob(conf);
- job.setBoolean("hama.yarn.application", true);
+ System.out.println(conf.get("bsp.user.name"));
job.setBspClass(HelloBSP.class);
job.setJarByClass(HelloBSP.class);
job.setJobName("Serialize Printing");