Author: edwardyoon
Date: Thu May 14 13:01:18 2015
New Revision: 1679360
URL: http://svn.apache.org/r1679360
Log: (empty)
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679360&r1=1679359&r2=1679360&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu May
14 13:01:18 2015
@@ -471,7 +471,7 @@ public class BSPJobClient extends Config
partitioningJob.getConfiguration().setClass(
MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
MessageQueue.class);
-
+
partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
partitioningJob.setInputFormat(job.getInputFormat().getClass());
partitioningJob.setInputKeyClass(job.getInputKeyClass());
@@ -578,6 +578,15 @@ public class BSPJobClient extends Config
DataOutputBuffer buffer = new DataOutputBuffer();
RawSplit rawSplit = new RawSplit();
for (InputSplit split : splits) {
+
+ // set partitionID to rawSplit
+ if (split.getClass().getName().equals(FileSplit.class.getName())
+ &&
job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
+ && job.get("bsp.partitioning.runner.job") == null) {
+ String[] extractPartitionID = ((FileSplit)
split).getPath().getName().split("[-]");
+ rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
+ }
+
rawSplit.setClassName(split.getClass().getName());
buffer.reset();
split.write(buffer);
@@ -629,7 +638,10 @@ public class BSPJobClient extends Config
for (int i = 0; i < len; ++i) {
RawSplit split = new RawSplit();
split.readFields(in);
- result[i] = split;
+ if (split.getPartitionID() != Integer.MIN_VALUE)
+ result[split.getPartitionID()] = split;
+ else
+ result[i] = split;
}
return result;
}
@@ -1075,12 +1087,21 @@ public class BSPJobClient extends Config
private String splitClass;
private BytesWritable bytes = new BytesWritable();
private String[] locations;
+ private int partitionID = Integer.MIN_VALUE;
long dataLength;
public void setBytes(byte[] data, int offset, int length) {
bytes.set(data, offset, length);
}
+ public void setPartitionID(int id) {
+ this.partitionID = id;
+ }
+
+ public int getPartitionID() {
+ return partitionID;
+ }
+
public void setClassName(String className) {
splitClass = className;
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1679360&r1=1679359&r2=1679360&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Thu May 14 13:01:18 2015
@@ -144,7 +144,6 @@ public class PartitioningRunner extends
raw = new MapWritable();
raw.put(rawRecord.getKey(), rawRecord.getValue());
- System.out.println(peer.getPeerName(index) + ", " + rawRecord.getKey() +
", " + rawRecord.getValue());
peer.send(peer.getPeerName(index), raw);
}
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679360&r1=1679359&r2=1679360&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Thu May 14 13:01:18 2015
@@ -78,8 +78,6 @@ public class TestKeyValueTextInputFormat
int expectedPeerId = Math.abs(key.hashCode() % numTasks);
- System.out.println(peer.getPeerName() + ": " + key + ", " + value + ",
" + expectedPeerId);
- /*
if (expectedPeerId == peer.getPeerIndex()) {
expectedKeys.put(new Text(key), new Text(value));
} else {
@@ -88,7 +86,6 @@ public class TestKeyValueTextInputFormat
new BooleanWritable(true));
break;
}
- */
}
message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
expectedKeys);
@@ -106,7 +103,6 @@ public class TestKeyValueTextInputFormat
while ((msg = peer.getCurrentMessage()) != null) {
blValue = (BooleanWritable) msg.get(new Text(
KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
- System.out.println(">>>>> " + peer.getPeerName() + ", " +
blValue.get());
assertEquals(false, blValue.get());
values = (MapWritable) msg.get(new Text(
KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));