Author: apurv
Date: Tue Jan 8 21:45:42 2013
New Revision: 1430555
URL: http://svn.apache.org/viewvc?rev=1430555&view=rev
Log:
fixed partitioning
Modified:
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Tue Jan 8
21:45:42 2013
@@ -60,25 +60,27 @@ public interface Constants {
public static final String UTF8_ENCODING = "UTF-8";
public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
-
+
public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
public static final String MAX_TASKS_PER_JOB = "bsp.max.tasks.per.job";
+ public static final String COMBINER_CLASS = "bsp.combiner.class";
+
public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
////////////////////////////////////////
// Task scheduler related constants
// //////////////////////////////////////
-
+
public static final String TASK_ALLOCATOR_CLASS = "bsp.taskalloc.class";
-
+
// //////////////////////////////////////
// Fault tolerance related constants
// //////////////////////////////////////
public static final String FAULT_TOLERANCE_FLAG = "bsp.ft.enabled";
-
+
public static final String FAULT_TOLERANCE_CLASS = "bsp.ft.class";
// //////////////////////////////////////
@@ -92,6 +94,25 @@ public interface Constants {
// By default checkpointing when enabled would checkpoint on every superstep
public static final short DEFAULT_CHECKPOINT_INTERVAL = 1;
+ // /////////////////////////////////////////////
+ // Job configuration related parameters.
+ // /////////////////////////////////////////////
+ public static final String JOB_INPUT_DIR = "bsp.input.dir";
+ public static final String JOB_PEERS_COUNT = "bsp.peers.num";
+ public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class";
+ public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class";
+
+
+ // /////////////////////////////////////////////
+ // Constants related to partitioning
+ // /////////////////////////////////////////////
+ public static final String RUNTIME_PARTITIONING_DIR = "bsp.partitioning.dir";
+ public static final String ENABLE_RUNTIME_PARTITIONING =
"bsp.input.runtime.partitioning";
+ public static final String RUNTIME_PARTITIONING_CLASS =
"bsp.input.partitioner.class";
+ public static final String RUNTIME_DESIRED_PEERS_COUNT =
"desired.num.of.tasks";
+ public static final String RUNTIME_PARTITION_RECORDCONVERTER =
"bsp.runtime.partition.recordconverter";
+
+
// /////////////////////////////////////
// Constants for ZooKeeper
// /////////////////////////////////////
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Jan 8
21:45:42 2013
@@ -47,7 +47,6 @@ public class BSPJob extends BSPJobContex
private JobState state = JobState.DEFINE;
private BSPJobClient jobClient;
private RunningJob info;
- private boolean isPartitioned = false; //TODO: If input is already
partitioned init to true
public BSPJob() throws IOException {
this(new HamaConfiguration());
@@ -130,13 +129,13 @@ public class BSPJob extends BSPJobContex
public void setCombinerClass(Class<? extends Combiner<? extends Writable>>
cls) {
ensureState(JobState.DEFINE);
- conf.setClass(COMBINER_CLASS_ATTR, cls, Combiner.class);
+ conf.setClass(Constants.COMBINER_CLASS, cls, Combiner.class);
}
@SuppressWarnings("unchecked")
public Class<? extends Combiner<? extends Writable>> getCombinerClass() {
return (Class<? extends Combiner<? extends Writable>>) conf.getClass(
- COMBINER_CLASS_ATTR, Combiner.class);
+ Constants.COMBINER_CLASS, Combiner.class);
}
public void setJar(String jar) {
@@ -223,44 +222,6 @@ public class BSPJob extends BSPJobContex
public boolean waitForCompletion(boolean verbose) throws IOException,
InterruptedException, ClassNotFoundException {
- if (this.getConfiguration().get("bsp.input.partitioner.class") != null
- && !isPartitioned) {
- FileSystem fs = FileSystem.get(conf);
- Path inputDir = new Path(conf.get("bsp.input.dir"));
- if (fs.isFile(inputDir)) {
- inputDir = inputDir.getParent();
- }
- Path partitionDir = new Path(inputDir + "/partitions");
-
- if (fs.exists(partitionDir)) {
- fs.delete(partitionDir, true);
- }
-
- HamaConfiguration conf = new HamaConfiguration();
- conf.setInt("desired.num.of.tasks",
- Integer.parseInt(this.getConfiguration().get("bsp.peers.num")));
- if(this.getConfiguration().get("bsp.partitioning.dir") != null) {
- conf.set("bsp.partitioning.dir",
this.getConfiguration().get("bsp.partitioning.dir"));
- }
- BSPJob partitioningJob = new BSPJob(conf);
- partitioningJob.setInputPath(new Path(this.getConfiguration().get(
- "bsp.input.dir")));
- partitioningJob.setInputFormat(this.getInputFormat().getClass());
- partitioningJob.setInputKeyClass(this.getInputKeyClass());
- partitioningJob.setInputValueClass(getInputValueClass());
- partitioningJob.setOutputFormat(NullOutputFormat.class);
- partitioningJob.setBspClass(PartitioningRunner.class);
-
- isPartitioned = partitioningJob.waitForCompletion(true);
- if (isPartitioned) {
- if(conf.get("bsp.partitioning.dir") != null) {
- this.setInputPath(new Path(conf.get("bsp.partitioning.dir")));
- } else {
- this.setInputPath(new Path(inputDir + "/partitions"));
- }
- }
- }
-
if (state == JobState.DEFINE) {
submit();
}
@@ -292,13 +253,13 @@ public class BSPJob extends BSPJobContex
@SuppressWarnings({ "rawtypes" })
public InputFormat getInputFormat() {
- return ReflectionUtils.newInstance(conf.getClass("bsp.input.format.class",
+ return
ReflectionUtils.newInstance(conf.getClass(Constants.INPUT_FORMAT_CLASS,
TextInputFormat.class, InputFormat.class), conf);
}
@SuppressWarnings({ "rawtypes" })
public void setInputFormat(Class<? extends InputFormat> cls) {
- conf.setClass("bsp.input.format.class", cls, InputFormat.class);
+ conf.setClass(Constants.INPUT_FORMAT_CLASS, cls, InputFormat.class);
}
/**
@@ -406,7 +367,7 @@ public class BSPJob extends BSPJobContex
*/
@SuppressWarnings("rawtypes")
public void setOutputFormat(Class<? extends OutputFormat> theClass) {
- conf.setClass("bsp.output.format.class", theClass, OutputFormat.class);
+ conf.setClass(Constants.OUTPUT_FORMAT_CLASS, theClass, OutputFormat.class);
}
/**
@@ -414,19 +375,18 @@ public class BSPJob extends BSPJobContex
*/
@SuppressWarnings("rawtypes")
public void setPartitioner(Class<? extends Partitioner> theClass) {
- conf.setClass("bsp.input.partitioner.class", theClass, Partitioner.class);
+ conf.setClass(Constants.RUNTIME_PARTITIONING_CLASS, theClass,
Partitioner.class);
}
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
- return ReflectionUtils.newInstance(conf
- .getClass("bsp.input.partitioner.class", HashPartitioner.class,
- Partitioner.class), conf);
+ return ReflectionUtils.newInstance(
+ conf.getClass(Constants.RUNTIME_PARTITIONING_CLASS,
HashPartitioner.class,Partitioner.class), conf);
}
@SuppressWarnings("rawtypes")
public OutputFormat getOutputFormat() {
- return ReflectionUtils.newInstance(conf.getClass("bsp.output.format.class",
+ return
ReflectionUtils.newInstance(conf.getClass(Constants.OUTPUT_FORMAT_CLASS,
TextOutputFormat.class, OutputFormat.class), conf);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue Jan
8 21:45:42 2013
@@ -288,7 +288,7 @@ public class BSPJobClient extends Config
* @throws IOException
*/
public RunningJob submitJob(BSPJob job) throws FileNotFoundException,
- IOException {
+ IOException {
return submitJobInternal(job, jobSubmitClient.getNewJobId());
}
@@ -323,6 +323,10 @@ public class BSPJobClient extends Config
if (job.get("bsp.input.dir") != null) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
+
+ job = partition(job, maxTasks);
+ maxTasks = job.getInt("hama.partition.count", maxTasks);
+
job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
job.set("bsp.job.split.file", submitSplitFile.toString());
}
@@ -364,6 +368,79 @@ public class BSPJobClient extends Config
return launchJob(jobId, job, submitJobFile, fs);
}
+
+ protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
+
+ if(job.get("bsp.partitioning.runner.job") != null){return job;}//Early
exit for the partitioner job.
+
+ InputSplit[] splits = job.getInputFormat().getSplits(job,
+ (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+ : maxTasks);
+
+ String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
+ Path inputDir = new Path(inputPath);
+ if (fs.isFile(inputDir)) {
+ inputDir = inputDir.getParent();
+ }
+
+ if (inputPath != null) {
+ int numSplits = splits.length;
+ int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
+
+ if ((numTasks > 0 && numTasks != numSplits)
+ || (job.getConfiguration().getBoolean(
+ Constants.ENABLE_RUNTIME_PARTITIONING, false) && job
+ .getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) !=
null)) {
+
+ Path partitionDir = new Path(inputDir + "/partitions");
+
+ if (fs.exists(partitionDir)) {
+ fs.delete(partitionDir, true);
+ }
+
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT,
+ Integer.parseInt(job.getConfiguration().get("bsp.peers.num")));
+ if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) !=
null) {
+ conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
+ .get(Constants.RUNTIME_PARTITIONING_DIR));
+ }
+ conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
job.get(Constants.RUNTIME_PARTITIONING_CLASS));
+ BSPJob partitioningJob = new BSPJob(conf);
+ partitioningJob.setInputPath(new Path(job.getConfiguration().get(
+ Constants.JOB_INPUT_DIR)));
+ partitioningJob.setInputFormat(job.getInputFormat().getClass());
+ partitioningJob.setInputKeyClass(job.getInputKeyClass());
+ partitioningJob.setInputValueClass(job.getInputValueClass());
+ partitioningJob.setOutputFormat(NullOutputFormat.class);
+ partitioningJob.setBspClass(PartitioningRunner.class);
+ partitioningJob.set("bsp.partitioning.runner.job", "true");
+
+ boolean isPartitioned = false;
+ try {
+ isPartitioned = partitioningJob.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted partitioning run-time.", e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Class not found error partitioning run-time.", e);
+ }
+ if (isPartitioned) {
+ if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR)
!= null) {
+ job.setInputPath(new Path(conf
+ .get(Constants.RUNTIME_PARTITIONING_DIR)));
+ } else {
+ job.setInputPath(new Path(inputDir + "/partitions"));
+ }
+ } else {
+ LOG.error("Error partitioning the input path.");
+ throw new IOException("Runtime partition failed for the job.");
+ }
+ }
+ }
+ return job;
+ }
+
+
protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException
{
int maxTasks;
ClusterStatus clusterStatus = getClusterStatus(true);
@@ -551,9 +628,9 @@ public class BSPJobClient extends Config
if (job.isSuccessful()) {
LOG.info("The total number of supersteps: " + info.getSuperstepCount());
info.getStatus()
- .getCounter()
- .incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
- info.getSuperstepCount());
+ .getCounter()
+ .incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
+ info.getSuperstepCount());
info.getStatus().getCounter().log(LOG);
} else {
LOG.info("Job failed.");
@@ -615,7 +692,7 @@ public class BSPJobClient extends Config
}
public static void runJob(BSPJob job) throws FileNotFoundException,
- IOException {
+ IOException {
BSPJobClient jc = new BSPJobClient(job.getConfiguration());
if (job.getNumBspTask() == 0
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Tue
Jan 8 21:45:42 2013
@@ -33,9 +33,6 @@ public class BSPJobContext {
// Put all of the attribute names in here so that BSPJob and JobContext are
// consistent.
protected static final String WORK_CLASS_ATTR = "bsp.work.class";
- protected static final String COMBINER_CLASS_ATTR = "bsp.combiner.class";
- protected static final String INPUT_FORMAT_CLASS_ATTR =
"bsp.inputformat.class";
- protected static final String OUTPUT_FORMAT_CLASS_ATTR =
"bsp.outputformat.class";
protected static final String WORKING_DIR = "bsp.working.dir";
protected final Configuration conf;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Jan
8 21:45:42 2013
@@ -183,7 +183,7 @@ public final class BSPPeerImpl<K1, V1, K
LOG.debug("Initialized Messaging service.");
}
- final String combinerName = conf.get("bsp.combiner.class");
+ final String combinerName = conf.get(Constants.COMBINER_CLASS);
if (combinerName != null) {
combiner = (Combiner<M>) ReflectionUtils.newInstance(
conf.getClassByName(combinerName), conf);
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Tue Jan 8 21:45:42 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
@@ -139,7 +140,7 @@ public class PartitioningRunner extends
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
return ReflectionUtils.newInstance(conf
- .getClass("bsp.input.partitioner.class", HashPartitioner.class,
+ .getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
Partitioner.class), conf);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java Tue
Jan 8 21:45:42 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
@@ -297,8 +298,8 @@ public class BinaryProtocol<K1 extends W
}
public void readKeyValue() throws IOException {
- boolean nullinput =
peer.getConfiguration().get("bsp.input.format.class") == null
- || peer.getConfiguration().get("bsp.input.format.class")
+ boolean nullinput =
peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS) == null
+ || peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS)
.equals("org.apache.hama.bsp.NullInputFormat");
if (!nullinput) {
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Tue
Jan 8 21:45:42 2013
@@ -43,6 +43,7 @@ public class TestPartitioning extends Te
Configuration conf = new Configuration();
conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
conf.set("bsp.partitioning.dir", "/tmp/hama-test/partitioning/localtest");
+ conf.setBoolean("bsp.input.runtime.partitioning", true);
BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
bsp.setJobName("Test partitioning with input");
bsp.setBspClass(PartionedBSP.class);