Author: edwardyoon
Date: Thu May 14 05:07:44 2015
New Revision: 1679312
URL: http://svn.apache.org/r1679312
Log:
HAMA-956: improve the runtime partitioner
Modified:
hama/trunk/conf/hama-default.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
Modified: hama/trunk/conf/hama-default.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Thu May 14 05:07:44 2015
@@ -204,23 +204,11 @@
<property>
<name>bsp.input.runtime.partitioning</name>
<value>true</value>
- <description>Basically, we provides a data partitioning program based on
BSP job,
+ <description>Basically, we provides a input data partitioning program
based on BSP job,
which you can use without any extra program. Set this property to false if
you
want to use the custom partition program.
</description>
</property>
- <property>
- <name>bsp.input.runtime.partitioning.sort.mb</name>
- <value>50</value>
- <description>The total amount of buffer memory in MB.
- </description>
- </property>
- <property>
- <name>bsp.input.runtime.partitioning.sort.factor</name>
- <value>10</value>
- <description>The maximum number of streams to merge at once; the default
is 10.
- </description>
- </property>
<property>
<name>io.serializations</name>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu May
14 05:07:44 2015
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
@@ -346,25 +347,26 @@ public class BSPJobClient extends Config
InputSplit[] splits = job.getInputFormat().getSplits(job,
(maxTasks > configured) ? configured : maxTasks);
+ if (job.getConfiguration().getBoolean(
+ Constants.ENABLE_RUNTIME_PARTITIONING, false)) {
+ job = partition(job, splits, maxTasks);
+ maxTasks = job.getInt("hama.partition.count", maxTasks);
+ }
+
+ if (job.getBoolean("input.has.partitioned", false)) {
+ splits = job.getInputFormat().getSplits(job, maxTasks);
+ }
+
if (maxTasks < splits.length) {
throw new IOException(
"Job failed! The number of splits has exceeded the number of max
tasks. The number of splits: "
+ splits.length + ", The number of max tasks: " + maxTasks);
}
- /*
- FIXME now graph job doesn't use this runtime input partitioning
- Should we support this feature at BSP framework level?
-
-
if(job.getConfiguration().getBoolean(Constants.ENABLE_RUNTIME_PARTITIONING,
false)) {
- job = partition(job, splits, maxTasks);
- maxTasks = job.getInt("hama.partition.count", maxTasks);
- }
- */
-
int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
if (numOfSplits > configured
- || !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS,
false)) {
+ || !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS,
+ false)) {
job.setNumBspTask(numOfSplits);
}
@@ -408,6 +410,99 @@ public class BSPJobClient extends Config
return launchJob(jobId, job, submitJobFile, fs);
}
+ protected BSPJob partition(BSPJob job, InputSplit[] splits, int maxTasks)
+ throws IOException {
+ String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
+ Path partitionDir = new Path("/tmp/hama-parts/" + job.getJobID() + "/");
+ if (fs.exists(partitionDir)) {
+ fs.delete(partitionDir, true);
+ }
+
+ if (job.get("bsp.partitioning.runner.job") != null) {
+ return job;
+ }// Early exit for the partitioner job.
+
+ if (inputPath != null) {
+ int numSplits = splits.length;
+ int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" numTasks = "
+ + numTasks
+ + " numSplits = "
+ + numSplits
+ + " enable = "
+ + (job.getConfiguration().getBoolean(
+ Constants.ENABLE_RUNTIME_PARTITIONING, false)
+ + " class = " + job.getConfiguration().get(
+ Constants.RUNTIME_PARTITIONING_CLASS)));
+ }
+
+ if (numTasks == 0) {
+ numTasks = numSplits;
+ }
+
+ if (job.getConfiguration().getBoolean(
+ Constants.ENABLE_RUNTIME_PARTITIONING, false)
+ && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS)
!= null) {
+
+ HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
+
+ if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) !=
null) {
+ conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
+ .get(Constants.RUNTIME_PARTITIONING_DIR));
+ }
+
+ conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
+ job.get(Constants.RUNTIME_PARTITIONING_CLASS));
+ BSPJob partitioningJob = new BSPJob(conf);
+ partitioningJob.setJobName("Runtime partitioning job for "
+ + partitioningJob.getJobName());
+ LOG.debug("partitioningJob input: "
+ + partitioningJob.get(Constants.JOB_INPUT_DIR));
+
+ partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
+ partitioningJob.setInputFormat(job.getInputFormat().getClass());
+ partitioningJob.setInputKeyClass(job.getInputKeyClass());
+ partitioningJob.setInputValueClass(job.getInputValueClass());
+
+ partitioningJob.setOutputFormat(SequenceFileOutputFormat.class);
+ partitioningJob.setOutputKeyClass(job.getInputKeyClass());
+ partitioningJob.setOutputValueClass(job.getInputValueClass());
+
+ partitioningJob.setBspClass(PartitioningRunner.class);
+ partitioningJob.setMessageClass(MapWritable.class);
+ partitioningJob.set("bsp.partitioning.runner.job", "true");
+ partitioningJob.getConfiguration().setBoolean(
+ Constants.ENABLE_RUNTIME_PARTITIONING, false);
+ partitioningJob.setOutputPath(partitionDir);
+
+ boolean isPartitioned = false;
+ try {
+ isPartitioned = partitioningJob.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted partitioning run-time.", e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Class not found error partitioning run-time.", e);
+ }
+
+ if (isPartitioned) {
+ if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR)
!= null) {
+ job.setInputPath(new Path(conf
+ .get(Constants.RUNTIME_PARTITIONING_DIR)));
+ } else {
+ job.setInputPath(partitionDir);
+ }
+ job.setBoolean("input.has.partitioned", true);
+ job.setInputFormat(NonSplitSequenceFileInputFormat.class);
+ } else {
+ LOG.error("Error partitioning the input path.");
+ throw new IOException("Runtime partition failed for the job.");
+ }
+ }
+ }
+ return job;
+ }
+
protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
Path submitJobFile, FileSystem fs) throws IOException {
//
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Thu May 14 05:07:44 2015
@@ -25,15 +25,9 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.sync.SyncException;
@@ -41,36 +35,24 @@ import org.apache.hama.commons.util.KeyV
import org.apache.hama.pipes.PipesPartitioner;
public class PartitioningRunner extends
- BSP<Writable, Writable, Writable, Writable, NullWritable> {
+ BSP<Writable, Writable, Writable, Writable, MapWritable> {
public static final Log LOG = LogFactory.getLog(PartitioningRunner.class);
private Configuration conf;
- private int desiredNum;
- private FileSystem fs = null;
- private Path partitionDir;
private RecordConverter converter;
private PipesPartitioner<?, ?> pipesPartitioner = null;
@Override
public final void setup(
- BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+ BSPPeer<Writable, Writable, Writable, Writable, MapWritable> peer)
throws IOException, SyncException, InterruptedException {
this.conf = peer.getConfiguration();
- this.desiredNum = conf.getInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, 1);
-
- this.fs = FileSystem.get(conf);
converter = ReflectionUtils.newInstance(conf.getClass(
Constants.RUNTIME_PARTITION_RECORDCONVERTER,
DefaultRecordConverter.class, RecordConverter.class), conf);
converter.setup(conf);
-
- if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
- this.partitionDir = new Path(conf.get("bsp.output.dir"));
- } else {
- this.partitionDir = new
Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR));
- }
}
/**
@@ -97,10 +79,9 @@ public class PartitioningRunner extends
throws IOException;
public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
- @SuppressWarnings("rawtypes")
- Partitioner partitioner, Configuration conf,
- @SuppressWarnings("rawtypes")
- BSPPeer peer, int numTasks);
+ @SuppressWarnings("rawtypes") Partitioner partitioner,
+ Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+ int numTasks);
}
/**
@@ -117,10 +98,9 @@ public class PartitioningRunner extends
@SuppressWarnings("unchecked")
@Override
public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
- @SuppressWarnings("rawtypes")
- Partitioner partitioner, Configuration conf,
- @SuppressWarnings("rawtypes")
- BSPPeer peer, int numTasks) {
+ @SuppressWarnings("rawtypes") Partitioner partitioner,
+ Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+ int numTasks) {
return Math.abs(partitioner.getPartition(outputRecord.getKey(),
outputRecord.getValue(), numTasks));
}
@@ -136,15 +116,13 @@ public class PartitioningRunner extends
@Override
@SuppressWarnings({ "rawtypes" })
public void bsp(
- BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+ BSPPeer<Writable, Writable, Writable, Writable, MapWritable> peer)
throws IOException, SyncException, InterruptedException {
- int peerNum = peer.getNumPeers();
Partitioner partitioner = getPartitioner();
KeyValuePair<Writable, Writable> rawRecord = null;
KeyValuePair<Writable, Writable> convertedRecord = null;
- Class convertedKeyClass = null;
Class rawKeyClass = null;
Class rawValueClass = null;
MapWritable raw = null;
@@ -160,175 +138,35 @@ public class PartitioningRunner extends
throw new IOException("The converted record can't be null.");
}
- Writable convertedKey = convertedRecord.getKey();
- convertedKeyClass = convertedKey.getClass();
-
int index = converter.getPartitionId(convertedRecord, partitioner, conf,
- peer, desiredNum);
-
- if (!writerCache.containsKey(index)) {
- Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
- + peer.getPeerIndex());
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destFile, convertedKeyClass, MapWritable.class,
- CompressionType.NONE);
- writerCache.put(index, writer);
- }
+ peer, peer.getNumPeers());
raw = new MapWritable();
raw.put(rawRecord.getKey(), rawRecord.getValue());
-
- writerCache.get(index).append(convertedKey, raw);
- }
-
- for (SequenceFile.Writer w : writerCache.values()) {
- w.close();
+ peer.send(peer.getPeerName(index), raw);
}
peer.sync();
- FileStatus[] status = fs.listStatus(partitionDir);
- // Call sync() one more time to avoid concurrent access
- peer.sync();
-
- for (FileStatus stat : status) {
- int partitionID = Integer
- .parseInt(stat.getPath().getName().split("[-]")[1]);
-
- if (getMergeProcessorID(partitionID, peerNum) == peer.getPeerIndex()) {
- Path destinationFilePath = new Path(partitionDir + "/"
- + getPartitionName(partitionID));
-
- FileStatus[] files = fs.listStatus(stat.getPath());
- if (convertedRecord.getKey() instanceof WritableComparable
- && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
- mergeSortedFiles(files, destinationFilePath, convertedKeyClass,
- rawKeyClass, rawValueClass);
- } else {
- mergeFiles(files, destinationFilePath, convertedKeyClass,
- rawKeyClass, rawValueClass);
- }
- fs.delete(stat.getPath(), true);
- }
- }
- }
-
- @SuppressWarnings("rawtypes")
- public Map<Integer, KeyValuePair<WritableComparable, MapWritable>>
candidates = new HashMap<Integer, KeyValuePair<WritableComparable,
MapWritable>>();
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void mergeSortedFiles(FileStatus[] status, Path destinationFilePath,
- Class convertedKeyClass, Class rawKeyClass, Class rawValueClass)
- throws IOException {
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destinationFilePath, rawKeyClass, rawValueClass, CompressionType.NONE);
- WritableComparable convertedKey;
- MapWritable value;
-
- Map<Integer, SequenceFile.Reader> readers = new HashMap<Integer,
SequenceFile.Reader>();
- for (int i = 0; i < status.length; i++) {
- SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
- convertedKeyClass, MapWritable.class, conf);
- sorter.setMemory(conf
- .getInt("bsp.input.runtime.partitioning.sort.mb", 50) * 1024 * 1024);
- sorter.setFactor(conf.getInt(
- "bsp.input.runtime.partitioning.sort.factor", 10));
- sorter.sort(status[i].getPath(), status[i].getPath().suffix(".sorted"));
-
- readers.put(i,
- new SequenceFile.Reader(fs, status[i].getPath().suffix(".sorted"),
- conf));
- }
-
- for (int i = 0; i < readers.size(); i++) {
- convertedKey = (WritableComparable) ReflectionUtils.newInstance(
- convertedKeyClass, conf);
- value = new MapWritable();
-
- readers.get(i).next(convertedKey, value);
- candidates.put(i, new KeyValuePair(convertedKey, value));
- }
-
- while (readers.size() > 0) {
- convertedKey = (WritableComparable) ReflectionUtils.newInstance(
- convertedKeyClass, conf);
- value = new MapWritable();
-
- int readerIndex = 0;
- WritableComparable firstKey = null;
- MapWritable rawRecord = null;
-
- for (Map.Entry<Integer, KeyValuePair<WritableComparable, MapWritable>>
keys : candidates
- .entrySet()) {
- if (firstKey == null) {
- readerIndex = keys.getKey();
- firstKey = keys.getValue().getKey();
- rawRecord = (MapWritable) keys.getValue().getValue();
- } else {
- WritableComparable currentKey = keys.getValue().getKey();
- if (firstKey.compareTo(currentKey) > 0) {
- readerIndex = keys.getKey();
- firstKey = currentKey;
- rawRecord = (MapWritable) keys.getValue().getValue();
- }
- }
- }
-
- for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
- writer.append(e.getKey(), e.getValue());
- }
- candidates.remove(readerIndex);
+ MapWritable record;
- if (readers.get(readerIndex).next(convertedKey, value)) {
- candidates.put(readerIndex, new KeyValuePair(convertedKey, value));
- } else {
- readers.get(readerIndex).close();
- readers.remove(readerIndex);
+ while ((record = peer.getCurrentMessage()) != null) {
+ for (Map.Entry<Writable, Writable> e : record.entrySet()) {
+ peer.write(e.getKey(), e.getValue());
}
}
- candidates.clear();
- writer.close();
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void mergeFiles(FileStatus[] status, Path destinationFilePath,
- Class convertedKeyClass, Class rawKeyClass, Class rawValueClass)
- throws IOException {
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destinationFilePath, rawKeyClass, rawValueClass, CompressionType.NONE);
- Writable key;
- MapWritable rawRecord;
-
- for (int i = 0; i < status.length; i++) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,
- status[i].getPath(), conf);
- key = (Writable) ReflectionUtils.newInstance(convertedKeyClass, conf);
- rawRecord = new MapWritable();
-
- while (reader.next(key, rawRecord)) {
- for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
- writer.append(e.getKey(), e.getValue());
- }
- }
- reader.close();
- }
- writer.close();
}
@Override
public void cleanup(
- BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+ BSPPeer<Writable, Writable, Writable, Writable, MapWritable> peer)
throws IOException {
if (this.pipesPartitioner != null) {
this.pipesPartitioner.cleanup();
}
}
- public static int getMergeProcessorID(int partitionID, int peerNum) {
- return partitionID % peerNum;
- }
-
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
Class<? extends Partitioner> partitionerClass = conf.getClass(
@@ -355,8 +193,4 @@ public class PartitioningRunner extends
return partitioner;
}
- private static String getPartitionName(int i) {
- return "part-" + String.valueOf(100000 + i).substring(1, 6);
- }
-
}
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Thu May 14 05:07:44 2015
@@ -162,9 +162,8 @@ public class TestKeyValueTextInputFormat
job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
job.setPartitioner(HashPartitioner.class);
-
- // FIXME see 362 line at BSPJobClient.java
- job.setNumBspTask(1);
+
+ job.setNumBspTask(2);
job.setInputPath(dataPath);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setInputKeyClass(Text.class);
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Thu
May 14 05:07:44 2015
@@ -88,15 +88,6 @@ public class TestPartitioning extends Ha
FileSystem fs = FileSystem.get(conf);
fs.delete(OUTPUT_PATH, true);
-
- getMergeProcessorID();
- }
-
- public void getMergeProcessorID() {
- int peerNum = 6;
- for (int partitionID = 0; partitionID < 8; partitionID++) {
- assertTrue(PartitioningRunner.getMergeProcessorID(partitionID, peerNum)
< peerNum);
- }
}
public static class PartionedBSP extends
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu May
14 05:07:44 2015
@@ -132,7 +132,6 @@ public class GraphJob extends BSPJob {
ensureState(JobState.DEFINE);
conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
RecordConverter.class);
- conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
}
/**
@@ -156,7 +155,6 @@ public class GraphJob extends BSPJob {
public void setPartitioner(
@SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
super.setPartitioner(theClass);
- conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
}
@Override