Author: edwardyoon
Date: Mon Oct 29 03:59:20 2012
New Revision: 1403164
URL: http://svn.apache.org/viewvc?rev=1403164&view=rev
Log:
Add bsp.max.tasks.per.job property
Modified:
hama/trunk/CHANGES.txt
hama/trunk/conf/hama-default.xml
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1403164&r1=1403163&r2=1403164&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Oct 29 03:59:20 2012
@@ -4,6 +4,7 @@ Release 0.6 (unreleased changes)
NEW FEATURES
+ HAMA-626: If number of bsp tasks are not specified, the bsp job cannot
assume to get all the tasks in the cluster (edwardyoon)
HAMA-651: Add a gradient descent BSP (Tommaso Teofili)
HAMA-601: Hama Streaming (tjungblut)
Modified: hama/trunk/conf/hama-default.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1403164&r1=1403163&r2=1403164&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Mon Oct 29 03:59:20 2012
@@ -116,6 +116,13 @@
by a groom server.</description>
</property>
<property>
+ <name>bsp.max.tasks.per.job</name>
+ <value></value>
+ <description>The maximum number of BSP tasks per job.
+ By default, This limit is switched off.
+ </description>
+ </property>
+ <property>
<name>bsp.ft.enabled</name>
<value>false</value>
<description>Enable Fault Tolerance in BSP Task execution.</description>
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1403164&r1=1403163&r2=1403164&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon Oct 29
03:59:20 2012
@@ -63,6 +63,8 @@ public interface Constants {
public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
+ public static final String MAX_TASKS_PER_JOB = "bsp.max.tasks.per.job";
+
public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
////////////////////////////////////////
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1403164&r1=1403163&r2=1403164&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Oct
29 03:59:20 2012
@@ -48,10 +48,10 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -59,6 +59,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.JobSubmissionProtocol;
@@ -300,12 +301,19 @@ public class BSPJobClient extends Config
throws IOException {
BSPJob job = pJob;
job.setJobID(jobId);
-
+ int maxTasks = 0;
+ int limitTasks = job.getConf().getInt(Constants.MAX_TASKS_PER_JOB, 0);
+
ClusterStatus clusterStatus = getClusterStatus(true);
- int maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
+
+ if(limitTasks > 0) {
+ maxTasks = limitTasks;
+ } else {
+ maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
+ }
if (maxTasks < job.getNumBspTask()) {
- throw new IOException("Job failed! No more taks slots available");
+ throw new IOException("Job failed! The number of tasks has exceeded the
maximum allowed.");
}
Path submitJobDir = new Path(getSystemDir(), "submit_"