Author: edwardyoon Date: Thu Nov 6 00:00:14 2008 New Revision: 711787 URL: http://svn.apache.org/viewvc?rev=711787&view=rev Log: I also re-name the map/red classes.
Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java Removed: incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=711787&r1=711786&r2=711787&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Thu Nov 6 00:00:14 2008 @@ -30,10 +30,10 @@ import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hama.algebra.Add1DLayoutMap; -import org.apache.hama.algebra.Add1DLayoutReduce; -import org.apache.hama.algebra.Mult1DLayoutMap; -import org.apache.hama.algebra.Mult1DLayoutReduce; +import org.apache.hama.algebra.RowCyclicAdditionMap; +import org.apache.hama.algebra.RowCyclicAdditionReduce; +import org.apache.hama.algebra.SIMDMultiplyMap; +import org.apache.hama.algebra.SIMDMultiplyReduce; import org.apache.hama.io.VectorEntry; import org.apache.hama.io.VectorMapWritable; import org.apache.hama.io.VectorUpdate; @@ -253,9 +253,9 @@ jobConf.setNumReduceTasks(Integer.parseInt(config .get("mapred.reduce.tasks"))); - Add1DLayoutMap.initJob(this.getPath(), B.getPath(), Add1DLayoutMap.class, + RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(), RowCyclicAdditionMap.class, IntWritable.class, VectorWritable.class, jobConf); - RowCyclicReduce.initJob(result.getPath(), Add1DLayoutReduce.class, jobConf); + RowCyclicReduce.initJob(result.getPath(), RowCyclicAdditionReduce.class, jobConf); JobManager.execute(jobConf, result); return result; @@ -303,9 +303,9 @@ jobConf.setNumReduceTasks(Integer.parseInt(config .get("mapred.reduce.tasks"))); - Mult1DLayoutMap.initJob(this.getPath(), B.getPath(), Mult1DLayoutMap.class, + SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class, jobConf); - RowCyclicReduce.initJob(result.getPath(), Mult1DLayoutReduce.class, jobConf); + RowCyclicReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, jobConf); JobManager.execute(jobConf, result); return result; } Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java?rev=711787&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java Thu Nov 6 00:00:14 2008 @@ -0,0 +1,73 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.DenseMatrix; +import org.apache.hama.DenseVector; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.Matrix; +import org.apache.hama.Vector; +import org.apache.hama.io.VectorWritable; +import org.apache.hama.mapred.RowCyclicMap; +import org.apache.log4j.Logger; + +public class RowCyclicAdditionMap extends RowCyclicMap<IntWritable, VectorWritable> { + static final Logger LOG = Logger.getLogger(RowCyclicAdditionMap.class); + protected Matrix matrix_b; + public static final String MATRIX_B = "hama.addition.matrix.b"; + + public void configure(JobConf job) { + try { + matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, "")); + } catch (IOException e) { + LOG.warn("Load matrix_b failed : " + e.getMessage()); + } + } + + public static void initJob(String matrix_a, String matrix_b, + Class<RowCyclicAdditionMap> map, Class<IntWritable> outputKeyClass, + Class<VectorWritable> outputValueClass, JobConf jobConf) { + + jobConf.setMapOutputValueClass(outputValueClass); + jobConf.setMapOutputKeyClass(outputKeyClass); + jobConf.setMapperClass(map); + jobConf.set(MATRIX_B, matrix_b); + + initJob(matrix_a, map, jobConf); + } + + @Override + public void map(IntWritable key, VectorWritable value, + OutputCollector<IntWritable, VectorWritable> output, Reporter reporter) + throws IOException { + + Vector v1 = matrix_b.getRow(key.get()); + output.collect(key, new VectorWritable(key.get(), (DenseVector) v1 + .add(value.getDenseVector()))); + + } + +} Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java?rev=711787&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java Thu Nov 6 00:00:14 2008 @@ -0,0 +1,51 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.io.VectorEntry; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.io.VectorWritable; +import org.apache.hama.mapred.RowCyclicReduce; + +public class RowCyclicAdditionReduce extends RowCyclicReduce<IntWritable, VectorWritable> { + + @Override + public void reduce(IntWritable key, Iterator<VectorWritable> values, + OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter) + throws IOException { + + VectorUpdate update = new VectorUpdate(key.get()); + VectorWritable vector = values.next(); + + for (Map.Entry<Integer, VectorEntry> f : vector.entrySet()) { + update.put(f.getKey(), f.getValue().getValue()); + } + + output.collect(key, update); + } + +} Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java?rev=711787&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java Thu Nov 6 00:00:14 2008 @@ -0,0 +1,78 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.DenseMatrix; +import org.apache.hama.DenseVector; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.Matrix; +import org.apache.hama.Vector; +import org.apache.hama.io.VectorWritable; +import org.apache.hama.mapred.RowCyclicMap; +import org.apache.log4j.Logger; + +/** + * SIMD version + */ +public class SIMDMultiplyMap extends RowCyclicMap<IntWritable, VectorWritable> { + static final Logger LOG = Logger.getLogger(SIMDMultiplyMap.class); + protected Matrix matrix_b; + public static final String MATRIX_B = "hama.multiplication.matrix.b"; + + public void configure(JobConf job) { + try { + matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, "")); + } catch (IOException e) { + LOG.warn("Load matrix_b failed : " + e.getMessage()); + } + } + + public static void initJob(String matrix_a, String matrix_b, + Class<SIMDMultiplyMap> map, Class<IntWritable> outputKeyClass, + Class<VectorWritable> outputValueClass, JobConf jobConf) { + + jobConf.setMapOutputValueClass(outputValueClass); + jobConf.setMapOutputKeyClass(outputKeyClass); + jobConf.setMapperClass(map); + jobConf.set(MATRIX_B, matrix_b); + + initJob(matrix_a, map, jobConf); + } + + @Override + public void map(IntWritable key, VectorWritable value, + OutputCollector<IntWritable, VectorWritable> output, Reporter reporter) + throws IOException { + + DenseVector v1 = value.getDenseVector(); + + for(int i = 0; i < v1.size(); i++) { + Vector v2 = matrix_b.getRow(i); + DenseVector sum = (DenseVector) v2.scale(v1.get(i)); + output.collect(key, new VectorWritable(key.get(), sum)); + } + } +} Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java?rev=711787&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java Thu Nov 6 00:00:14 2008 @@ -0,0 +1,65 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.io.VectorWritable; +import org.apache.hama.mapred.RowCyclicReduce; +import org.apache.log4j.Logger; + +public class SIMDMultiplyReduce extends + RowCyclicReduce<IntWritable, VectorWritable> { + static final Logger LOG = Logger.getLogger(SIMDMultiplyReduce.class); + public static final Map<Integer, Double> buffer = new HashMap<Integer, Double>(); + + @Override + public void reduce(IntWritable key, Iterator<VectorWritable> values, + OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter) + throws IOException { + + VectorUpdate update = new VectorUpdate(key.get()); + VectorWritable sum; + + // Summation + while (values.hasNext()) { + sum = values.next(); + for (int i = 0; i < sum.size(); i++) { + if (buffer.containsKey(i)) { + buffer.put(i, sum.get(i).getValue() + buffer.get(i)); + } else { + buffer.put(i, sum.get(i).getValue()); + } + } + } + + update.putAll(buffer); + buffer.clear(); + output.collect(key, update); + } + +} Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=711787&r1=711786&r2=711787&view=diff ============================================================================== --- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original) +++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Thu Nov 6 00:00:14 2008 @@ -27,8 +27,8 @@ import org.apache.hama.DenseMatrix; import org.apache.hama.HCluster; import org.apache.hama.Matrix; -import org.apache.hama.algebra.Add1DLayoutMap; -import org.apache.hama.algebra.Add1DLayoutReduce; +import org.apache.hama.algebra.RowCyclicAdditionMap; +import org.apache.hama.algebra.RowCyclicAdditionReduce; import org.apache.hama.io.VectorWritable; import org.apache.log4j.Logger; @@ -69,9 +69,9 @@ JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class); jobConf.setJobName("test MR job"); - Add1DLayoutMap.initJob(pathA, pathB, Add1DLayoutMap.class, IntWritable.class, + RowCyclicAdditionMap.initJob(pathA, pathB, RowCyclicAdditionMap.class, IntWritable.class, VectorWritable.class, jobConf); - RowCyclicReduce.initJob(output, Add1DLayoutReduce.class, jobConf); + RowCyclicReduce.initJob(output, RowCyclicAdditionReduce.class, jobConf); jobConf.setNumMapTasks(2); jobConf.setNumReduceTasks(2);