Author: edwardyoon
Date: Mon Jul 15 15:21:19 2013
New Revision: 1503292
URL: http://svn.apache.org/r1503292
Log:
HAMA-772: When selected KeyValueTextInputFormat, workers get only one value for
key
Added:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1503292&r1=1503291&r2=1503292&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jul 15 15:21:19 2013
@@ -6,6 +6,7 @@ Release 0.6.3 (unreleased changes)
BUG FIXES
+ HAMA-772: When selected KeyValueTextInputFormat, workers get only one value
for key (Ikhtiyor Ahmedov via edwardyoon)
HAMA-776: localQueue is set as Send queue in init function of
AbstractMessageManager (kennethxian)
HAMA-769: Intermediate queue's close method is not called, clean work may
be omitted (kennethxian)
HAMA-771: Determining the count of active vertices (edwardyoon)
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1503292&r1=1503291&r2=1503292&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Mon Jul 15 15:21:19 2013
@@ -19,6 +19,8 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -45,7 +47,7 @@ public class PartitioningRunner extends
private FileSystem fs = null;
private Path partitionDir;
private RecordConverter converter;
- private Map<Integer, Map<Writable, Writable>> values = new HashMap<Integer,
Map<Writable, Writable>>();
+ private Map<Integer,LinkedList<KeyValuePair<Writable,Writable>>> values =
new HashMap<Integer, LinkedList<KeyValuePair<Writable,Writable>>>();
@Override
public final void setup(
@@ -101,6 +103,11 @@ public class PartitioningRunner extends
* needed.
*/
public Map<Writable, Writable> newMap();
+
+ /**
+ * @return a list implementation, so order will not be changed in
subclasses
+ */
+ public List<KeyValuePair<Writable, Writable>> newList();
}
/**
@@ -133,6 +140,11 @@ public class PartitioningRunner extends
public Map<Writable, Writable> newMap() {
return new HashMap<Writable, Writable>();
}
+
+ @Override
+ public List<KeyValuePair<Writable, Writable>> newList() {
+ return new LinkedList<KeyValuePair<Writable,Writable>>();
+ }
}
@Override
@@ -160,23 +172,23 @@ public class PartitioningRunner extends
int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
desiredNum);
-
- Map<Writable, Writable> map = values.get(index);
- if (map == null) {
- map = converter.newMap();
- values.put(index, map);
+
+ LinkedList<KeyValuePair<Writable, Writable>> list = values.get(index);
+ if (list == null) {
+ list = (LinkedList<KeyValuePair<Writable, Writable>>)
converter.newList();
+ values.put(index, list);
}
- map.put(pair.getKey(), pair.getValue());
+ list.add(new KeyValuePair<Writable, Writable>(pair.getKey(),
pair.getValue()));
}
// The reason of use of Memory is to reduce file opens
- for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
+ for (Map.Entry<Integer, LinkedList<KeyValuePair<Writable, Writable>>> e :
values.entrySet()) {
Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
+ peer.getPeerIndex());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
destFile, keyClass, valueClass, CompressionType.NONE);
- for (Map.Entry<Writable, Writable> v : e.getValue().entrySet()) {
+ for (KeyValuePair<Writable, Writable> v : e.getValue()) {
writer.append(v.getKey(), v.getValue());
}
writer.close();
Added:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1503292&view=auto
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
(added)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Mon Jul 15 15:21:19 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestKeyValueTextInputFormat extends TestCase {
+
+ public static class KeyValueHashPartitionedBSP
+ extends
+ BSP<Text, Text, NullWritable, NullWritable, MapWritable> {
+ public static final String TEST_INPUT_VALUES = "test.bsp.max.input";
+ public static final String TEST_UNEXPECTED_KEYS =
"test.bsp.keys.unexpected";
+ public static final String TEST_MAX_VALUE = "test.bsp.keys.max";
+
+ private int numTasks = 0;
+ private int maxValue = 0;
+ private MapWritable expectedKeys = new MapWritable();
+ //private Set<Text> expectedKeys = new HashSet<Text>();
+
+ @Override
+ public void setup(
+ BSPPeer<Text, Text, NullWritable, NullWritable, MapWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ Configuration conf = peer.getConfiguration();
+ maxValue = conf.getInt(KeyValueHashPartitionedBSP.TEST_MAX_VALUE, 1000);
+ numTasks = peer.getNumPeers();
+ }
+
+ @Override
+ public void bsp(
+ BSPPeer<Text, Text, NullWritable, NullWritable, MapWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ Text key = null;
+ Text value = null;
+ MapWritable message = new MapWritable();
+ message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS),
new BooleanWritable(false));
+ KeyValuePair<Text, Text> tmp = null;
+
+ while ( (tmp = peer.readNext()) != null) {
+ key = tmp.getKey();
+ value = tmp.getValue();
+
+ int expectedPeerId = Math.abs(key.hashCode() % numTasks);
+
+ if (expectedPeerId == peer.getPeerIndex()) {
+ if (expectedKeys.containsKey(key)) {
+ // same key twice, incorrect
+ message.put(new
Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new
BooleanWritable(true));
+ break;
+ } else {
+ expectedKeys.put(new Text(key), new Text(value));
+ }
+ } else {
+ message.put(new
Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new
BooleanWritable(true));
+ break;
+ } //if (expectedPeerId == peer.getPeerIndex())
+ } //while (peer.readNext(key, value) != false)
+ message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
expectedKeys);
+
+ int master = peer.getNumPeers()/2;
+ String masterName = peer.getPeerName(master);
+ peer.send(masterName, message);
+ peer.sync();
+
+ if(peer.getPeerIndex() == master) {
+ MapWritable msg = null;
+ MapWritable values = null;
+ BooleanWritable blValue = null;
+ HashMap<Integer, Integer> input = new HashMap<Integer, Integer>();
+
+ while ( (msg = peer.getCurrentMessage()) != null ) {
+ blValue = (BooleanWritable) msg.get(new
Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
+ assertEquals(false, blValue.get());
+ values = (MapWritable) msg.get(new
Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));
+ for (Map.Entry<Writable,Writable> w : values.entrySet()) {
+ input.put( Integer.valueOf( w.getKey().toString() ),
Integer.valueOf( w.getValue().toString() ));
+ }
+ }
+
+ for (int i=0; i<maxValue; i++) {
+ assertEquals(true, input.containsKey(Integer.valueOf(i)));
+ assertEquals(i*i, input.get(Integer.valueOf(i)).intValue());
+ }
+ }
+ peer.sync();
+ }
+ }
+
+ @Test
+ public void testInput() {
+
+ Configuration fsConf = new Configuration();
+ String strDataPath = "/tmp/test_keyvalueinputformat";
+ Path dataPath = new Path(strDataPath);
+ Path outPath = new Path("/tmp/test_keyvalueinputformat_out");
+
+ int maxValue = 1000;
+
+ try {
+ URI uri = new URI(strDataPath);
+ FileSystem fs = FileSystem.get(uri, fsConf);
+ fs.delete(dataPath, true);
+ FSDataOutputStream fileOut = fs.create(dataPath, true);
+
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < maxValue; ++i) {
+ str.append(i);
+ str.append("\t");
+ str.append(i*i);
+ str.append("\n");
+ }
+ fileOut.writeBytes(str.toString());
+ fileOut.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.setInt(KeyValueHashPartitionedBSP.TEST_MAX_VALUE, maxValue);
+ BSPJob job = new BSPJob(conf, TestKeyValueTextInputFormat.class);
+ job.setJobName("Test KeyValueTextInputFormat together with
HashPartitioner");
+ job.setBspClass(KeyValueHashPartitionedBSP.class);
+
+ job.setPartitioner(HashPartitioner.class);
+
+ job.setInputPath(dataPath);
+ job.setInputFormat(KeyValueTextInputFormat.class);
+ job.setInputKeyClass(Text.class);
+ job.setInputValueClass(Text.class);
+
+ job.setOutputPath(outPath);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ BSPJobClient jobClient = new BSPJobClient(conf);
+ ClusterStatus cluster = jobClient.getClusterStatus(true);
+ job.setNumBspTask(cluster.getMaxTasks());
+
+ assertEquals(true, job.waitForCompletion(true));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1503292&r1=1503291&r2=1503292&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
Mon Jul 15 15:21:19 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hama.graph;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -103,4 +105,9 @@ public abstract class VertexInputReader<
return new TreeMap<Writable, Writable>();
}
+ @Override
+ public List<KeyValuePair<Writable, Writable>> newList() {
+ return new LinkedList<KeyValuePair<Writable,Writable>>();
+ }
+
}