Sorry for spam committees. Build fails but I can't reproduce on my laptop. :/
On Thu, May 14, 2015 at 10:14 PM, Tommaso Teofili <[email protected]> wrote: > Hi Edward, > > I see your latest commits miss the commit message, it'd be good if you > could always add that as that helps others understand such commits without > having to look too deeply into the code and to be able to browse SVN > history more easily. > > Thanks, > Tommaso > > 2015-05-14 15:01 GMT+02:00 <[email protected]>: > >> Author: edwardyoon >> Date: Thu May 14 13:01:18 2015 >> New Revision: 1679360 >> >> URL: http://svn.apache.org/r1679360 >> Log: (empty) >> >> Modified: >> hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java >> >> hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java >> >> hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java >> >> Modified: >> hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java >> URL: >> http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679360&r1=1679359&r2=1679360&view=diff >> >> ============================================================================== >> --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java >> (original) >> +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java >> Thu May 14 13:01:18 2015 >> @@ -471,7 +471,7 @@ public class BSPJobClient extends Config >> partitioningJob.getConfiguration().setClass( >> MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class, >> MessageQueue.class); >> - >> + >> partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true); >> partitioningJob.setInputFormat(job.getInputFormat().getClass()); >> partitioningJob.setInputKeyClass(job.getInputKeyClass()); >> @@ -578,6 +578,15 @@ public class BSPJobClient extends Config >> DataOutputBuffer buffer = new DataOutputBuffer(); >> RawSplit rawSplit = new RawSplit(); >> for (InputSplit split : splits) { >> + >> + // set partitionID to rawSplit >> + if (split.getClass().getName().equals(FileSplit.class.getName()) >> + && >> job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null >> + && job.get("bsp.partitioning.runner.job") == null) { >> + String[] extractPartitionID = ((FileSplit) >> split).getPath().getName().split("[-]"); >> + >> rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1])); >> + } >> + >> rawSplit.setClassName(split.getClass().getName()); >> buffer.reset(); >> split.write(buffer); >> @@ -629,7 +638,10 @@ public class BSPJobClient extends Config >> for (int i = 0; i < len; ++i) { >> RawSplit split = new RawSplit(); >> split.readFields(in); >> - result[i] = split; >> + if (split.getPartitionID() != Integer.MIN_VALUE) >> + result[split.getPartitionID()] = split; >> + else >> + result[i] = split; >> } >> return result; >> } >> @@ -1075,12 +1087,21 @@ public class BSPJobClient extends Config >> private String splitClass; >> private BytesWritable bytes = new BytesWritable(); >> private String[] locations; >> + private int partitionID = Integer.MIN_VALUE; >> long dataLength; >> >> public void setBytes(byte[] data, int offset, int length) { >> bytes.set(data, offset, length); >> } >> >> + public void setPartitionID(int id) { >> + this.partitionID = id; >> + } >> + >> + public int getPartitionID() { >> + return partitionID; >> + } >> + >> public void setClassName(String className) { >> splitClass = className; >> } >> >> Modified: >> hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java >> URL: >> http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1679360&r1=1679359&r2=1679360&view=diff >> >> ============================================================================== >> --- >> hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java >> (original) >> +++ >> hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java >> Thu May 14 13:01:18 2015 >> @@ -144,7 +144,6 @@ public class PartitioningRunner extends >> raw = new MapWritable(); >> raw.put(rawRecord.getKey(), rawRecord.getValue()); >> >> - System.out.println(peer.getPeerName(index) + ", " + >> rawRecord.getKey() + ", " + rawRecord.getValue()); >> peer.send(peer.getPeerName(index), raw); >> } >> >> >> Modified: >> hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java >> URL: >> http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679360&r1=1679359&r2=1679360&view=diff >> >> ============================================================================== >> --- >> hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java >> (original) >> +++ >> hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java >> Thu May 14 13:01:18 2015 >> @@ -78,8 +78,6 @@ public class TestKeyValueTextInputFormat >> >> int expectedPeerId = Math.abs(key.hashCode() % numTasks); >> >> - System.out.println(peer.getPeerName() + ": " + key + ", " + value >> + ", " + expectedPeerId); >> - /* >> if (expectedPeerId == peer.getPeerIndex()) { >> expectedKeys.put(new Text(key), new Text(value)); >> } else { >> @@ -88,7 +86,6 @@ public class TestKeyValueTextInputFormat >> new BooleanWritable(true)); >> break; >> } >> - */ >> } >> message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES), >> expectedKeys); >> @@ -106,7 +103,6 @@ public class TestKeyValueTextInputFormat >> while ((msg = peer.getCurrentMessage()) != null) { >> blValue = (BooleanWritable) msg.get(new Text( >> KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS)); >> - System.out.println(">>>>> " + peer.getPeerName() + ", " + >> blValue.get()); >> assertEquals(false, blValue.get()); >> values = (MapWritable) msg.get(new Text( >> KeyValueHashPartitionedBSP.TEST_INPUT_VALUES)); >> >> >> -- Best Regards, Edward J. Yoon
