Author: edwardyoon
Date: Thu May 14 12:07:03 2015
New Revision: 1679351
URL: http://svn.apache.org/r1679351
Log:
debugging
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679351&r1=1679350&r2=1679351&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu May
14 12:07:03 2015
@@ -465,6 +465,13 @@ public class BSPJobClient extends Config
LOG.debug("partitioningJob input: "
+ partitioningJob.get(Constants.JOB_INPUT_DIR));
+ partitioningJob.getConfiguration().setClass(
+ MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+ OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class);
+ partitioningJob.getConfiguration().setClass(
+ MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+ MessageQueue.class);
+
partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
partitioningJob.setInputFormat(job.getInputFormat().getClass());
partitioningJob.setInputKeyClass(job.getInputKeyClass());
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679351&r1=1679350&r2=1679351&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Thu May 14 12:07:03 2015
@@ -41,8 +41,7 @@ import org.junit.Test;
public class TestKeyValueTextInputFormat extends TestCase {
- public static class KeyValueHashPartitionedBSP
- extends
+ public static class KeyValueHashPartitionedBSP extends
BSP<Text, Text, NullWritable, NullWritable, MapWritable> {
public static final String TEST_INPUT_VALUES = "test.bsp.max.input";
public static final String TEST_UNEXPECTED_KEYS =
"test.bsp.keys.unexpected";
@@ -51,7 +50,8 @@ public class TestKeyValueTextInputFormat
private int numTasks = 0;
private int maxValue = 0;
private MapWritable expectedKeys = new MapWritable();
- //private Set<Text> expectedKeys = new HashSet<Text>();
+
+ // private Set<Text> expectedKeys = new HashSet<Text>();
@Override
public void setup(
@@ -69,10 +69,11 @@ public class TestKeyValueTextInputFormat
Text key = null;
Text value = null;
MapWritable message = new MapWritable();
- message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS),
new BooleanWritable(false));
+ message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS),
+ new BooleanWritable(false));
KeyValuePair<Text, Text> tmp = null;
- while ( (tmp = peer.readNext()) != null) {
+ while ((tmp = peer.readNext()) != null) {
key = tmp.getKey();
value = tmp.getValue();
@@ -80,60 +81,69 @@ public class TestKeyValueTextInputFormat
if (expectedPeerId == peer.getPeerIndex()) {
if (expectedKeys.containsKey(key)) {
+ System.out.println("duplicate: " + value + ", " +
expectedKeys.get(key));
// same key twice, incorrect
- message.put(new
Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new
BooleanWritable(true));
+ message.put(new Text(
+ KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS),
+ new BooleanWritable(true));
break;
} else {
expectedKeys.put(new Text(key), new Text(value));
}
} else {
- message.put(new
Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new
BooleanWritable(true));
+ System.out.println("duplicate: " + value + ", " +
expectedKeys.get(key));
+ message.put(
+ new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS),
+ new BooleanWritable(true));
break;
- } //if (expectedPeerId == peer.getPeerIndex())
- } //while (peer.readNext(key, value) != false)
- message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
expectedKeys);
-
+ } // if (expectedPeerId == peer.getPeerIndex())
+ } // while (peer.readNext(key, value) != false)
+ message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
+ expectedKeys);
+
int master = peer.getNumPeers() / 2;
- String masterName = peer.getPeerName(master);
- peer.send(masterName, message);
+ peer.send(peer.getPeerName(master), message);
peer.sync();
- if(peer.getPeerIndex() == master) {
+ if (peer.getPeerIndex() == master) {
MapWritable msg = null;
MapWritable values = null;
BooleanWritable blValue = null;
HashMap<Integer, Integer> input = new HashMap<Integer, Integer>();
- while ( (msg = peer.getCurrentMessage()) != null ) {
- blValue = (BooleanWritable) msg.get(new
Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
- System.out.println(">>>>> " + peer.getPeerName() + ", "+
blValue.get());
+ while ((msg = peer.getCurrentMessage()) != null) {
+ blValue = (BooleanWritable) msg.get(new Text(
+ KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
+ System.out.println(">>>>> " + peer.getPeerName() + ", " +
blValue.get());
assertEquals(false, blValue.get());
- values = (MapWritable) msg.get(new
Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));
- for (Map.Entry<Writable,Writable> w : values.entrySet()) {
- input.put( Integer.valueOf( w.getKey().toString() ),
Integer.valueOf( w.getValue().toString() ));
+ values = (MapWritable) msg.get(new Text(
+ KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));
+ for (Map.Entry<Writable, Writable> w : values.entrySet()) {
+ input.put(Integer.valueOf(w.getKey().toString()),
+ Integer.valueOf(w.getValue().toString()));
}
}
-
- for (int i=0; i<maxValue; i++) {
+
+ for (int i = 0; i < maxValue; i++) {
assertEquals(true, input.containsKey(Integer.valueOf(i)));
- assertEquals(i*i, input.get(Integer.valueOf(i)).intValue());
+ assertEquals(i * i, input.get(Integer.valueOf(i)).intValue());
}
}
peer.sync();
}
}
-
+
@Test
public void testInput() throws IOException {
-
+
Configuration fsConf = new Configuration();
String strDataPath = "/tmp/test_keyvalueinputformat";
Path dataPath = new Path(strDataPath);
Path outPath = new Path("/tmp/test_keyvalueinputformat_out");
-
+
int maxValue = 1000;
FileSystem fs = null;
-
+
try {
URI uri = new URI(strDataPath);
fs = FileSystem.get(uri, fsConf);
@@ -147,7 +157,7 @@ public class TestKeyValueTextInputFormat
for (int i = 0; i < maxValue; ++i) {
str.append(i);
str.append("\t");
- str.append(i*i);
+ str.append(i * i);
str.append("\n");
}
fileOut.writeBytes(str.toString());
@@ -156,18 +166,17 @@ public class TestKeyValueTextInputFormat
} catch (Exception e) {
e.printStackTrace();
}
-
-
+
try {
HamaConfiguration conf = new HamaConfiguration();
conf.setInt(KeyValueHashPartitionedBSP.TEST_MAX_VALUE, maxValue);
BSPJob job = new BSPJob(conf, TestKeyValueTextInputFormat.class);
job.setJobName("Test KeyValueTextInputFormat together with
HashPartitioner");
job.setBspClass(KeyValueHashPartitionedBSP.class);
-
+
job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
job.setPartitioner(HashPartitioner.class);
-
+
job.setNumBspTask(2);
job.setInputPath(dataPath);
job.setInputFormat(KeyValueTextInputFormat.class);