Author: millecker
Date: Thu Jan 16 09:39:26 2014
New Revision: 1558725
URL: http://svn.apache.org/r1558725
Log:
HAMA-852: Add MessageClass property in BSPJob
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Jan 16 09:39:26 2014
@@ -23,6 +23,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-852: Add MessageClass property in BSPJob (Martin Illecker)
HAMA-843: Message communication overhead between master aggregation and
vertex computation supersteps (edwardyoon)
HAMA-838: Refactor aggregators (Anastasis Andronidis)
HAMA-783: Improve the InMemory verticesInfo implementations (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Thu Jan 16
09:39:26 2014
@@ -101,7 +101,7 @@ public interface Constants {
public static final String JOB_PEERS_COUNT = "bsp.peers.num";
public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class";
public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class";
- public static final String MESSAGE_CLASS = "bsp.message.type.class";
+ public static final String MESSAGE_CLASS = "bsp.message.class";
// /////////////////////////////////////////////
// Messaging related parameters.
@@ -120,7 +120,7 @@ public interface Constants {
public static final String RUNTIME_PARTITIONING_CLASS =
"bsp.input.partitioner.class";
public static final String RUNTIME_DESIRED_PEERS_COUNT =
"desired.num.of.tasks";
public static final String RUNTIME_PARTITION_RECORDCONVERTER =
"bsp.runtime.partition.recordconverter";
-
+
public static final String PARTITION_SORT_BY_KEY =
"bsp.partition.sort.by.converted.record";
// /////////////////////////////////////
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Thu Jan 16
09:39:26 2014
@@ -356,6 +356,24 @@ public class BSPJob extends BSPJobContex
}
/**
+ * Get the message class.
+ *
+ * @return the message class.
+ */
+ public Class<?> getMessageClass() {
+ return conf.getClass(Constants.MESSAGE_CLASS, Text.class, Object.class);
+ }
+
+ /**
+ * Set the message class.
+ *
+ * @param theClass the message class.
+ */
+ public void setMessageClass(Class<?> theClass) {
+ conf.setClass(Constants.MESSAGE_CLASS, theClass, Object.class);
+ }
+
+ /**
* Sets the output path for the job.
*
* @param path where the output gets written.
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Thu Jan
16 09:39:26 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
@@ -191,7 +192,7 @@ public class Submitter implements Tool {
setIfUnset(job.getConfiguration(), "bsp.input.value.class", textClassname);
setIfUnset(job.getConfiguration(), "bsp.output.key.class", textClassname);
setIfUnset(job.getConfiguration(), "bsp.output.value.class",
textClassname);
- setIfUnset(job.getConfiguration(), "bsp.message.class",
+ setIfUnset(job.getConfiguration(), Constants.MESSAGE_CLASS,
BytesWritable.class.getName());
setIfUnset(job.getConfiguration(), "bsp.job.name", "Hama Pipes Job");
@@ -205,7 +206,7 @@ public class Submitter implements Tool {
LOG.debug("InputFormat: " + job.getOutputFormat());
LOG.debug("OutputKeyClass: " + job.getOutputKeyClass().getName());
LOG.debug("OutputValueClass: " + job.getOutputValueClass().getName());
- LOG.debug("MessageClass: " + job.get("bsp.message.class"));
+ LOG.debug("MessageClass: " + job.get(Constants.MESSAGE_CLASS));
LOG.debug("bsp.master.address: "
+ job.getConfiguration().get("bsp.master.address"));
Modified:
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
Thu Jan 16 09:39:26 2014
@@ -354,7 +354,7 @@ public class UplinkReader<KEYIN, VALUEIN
String peerName = Text.readString(this.inStream);
M message = (M) ReflectionUtils.newInstance((Class<? extends M>) conf
- .getClass("bsp.message.class", BytesWritable.class), conf);
+ .getClass(Constants.MESSAGE_CLASS, BytesWritable.class), conf);
LOG.debug("Got MessageType.SEND_MSG peerName: " + peerName
+ " messageClass: " + message.getClass().getName());
Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Thu Jan
16 09:39:26 2014
@@ -223,7 +223,7 @@ public class TestPipes extends HamaClust
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(NullWritable.class);
bsp.setOutputValueClass(DoubleWritable.class);
- bsp.set("bsp.message.class", DoubleWritable.class.getName());
+ bsp.setMessageClass(DoubleWritable.class);
return bsp;
}
@@ -233,7 +233,7 @@ public class TestPipes extends HamaClust
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(NullWritable.class);
bsp.setOutputValueClass(DoubleWritable.class);
- bsp.set("bsp.message.class", IntWritable.class.getName());
+ bsp.setMessageClass(IntWritable.class);
return bsp;
}
@@ -246,10 +246,9 @@ public class TestPipes extends HamaClust
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(IntWritable.class);
bsp.setOutputValueClass(PipesVectorWritable.class);
+ bsp.setMessageClass(PipesKeyValueWritable.class);
bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts");
- bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName());
-
bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
bsp.setPartitioner(PipesPartitioner.class);