Author: edwardyoon
Date: Thu Jul 31 19:43:10 2008
New Revision: 681591
URL: http://svn.apache.org/viewvc?rev=681591&view=rev
Log: (empty)
Added:
incubator/hama/trunk/src/java/org/apache/hama/algebra/
incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
Modified:
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=681591&r1=681590&r2=681591&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Thu Jul 31
19:43:10 2008
@@ -40,7 +40,7 @@
public final static String MINUS = "-";
/** Default columnfamily name */
- public final static Text COLUMN = new Text("column:");
+ public final static String COLUMN = "column:";
/** The numerator version of the fraction matrix */
public final static Text NUMERATOR = new Text("numerator:");
/** The denominator version of the fration matrix */
Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java?rev=681591&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
(added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java Thu
Jul 31 19:43:10 2008
@@ -0,0 +1,22 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Vector;
+import org.apache.hama.io.VectorDatum;
+import org.apache.hama.mapred.MatrixMap;
+
+public class AdditionMap extends MatrixMap<ImmutableBytesWritable,
VectorDatum> {
+
+ public void map(ImmutableBytesWritable key, VectorDatum value,
+ OutputCollector<ImmutableBytesWritable, VectorDatum> output,
+ Reporter reporter) throws IOException {
+
+ Vector v1 = new Vector(B.getRowResult(key.get()));
+ output.collect(key, v1.addition(key.get(), value.getVector()));
+ }
+
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java?rev=681591&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
(added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
Thu Jul 31 19:43:10 2008
@@ -0,0 +1,32 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorDatum;
+import org.apache.hama.mapred.MatrixReduce;
+
+public class AdditionReduce extends
+ MatrixReduce<ImmutableBytesWritable, VectorDatum> {
+
+ @Override
+ public void reduce(ImmutableBytesWritable key, Iterator<VectorDatum> values,
+ OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
+ Reporter reporter) throws IOException {
+
+ BatchUpdate b = new BatchUpdate(key.get());
+ VectorDatum vector = values.next();
+ for (Map.Entry<byte[], Cell> f : vector.entrySet()) {
+ b.put(f.getKey(), f.getValue().getValue());
+ }
+
+ output.collect(key, b);
+ }
+
+}
Modified:
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java?rev=681591&r1=681590&r2=681591&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
(original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
Thu Jul 31 19:43:10 2008
@@ -13,46 +13,46 @@
import org.apache.hadoop.mapred.JobConfigurable;
public class MatrixInputFormat extends MatrixInputFormatBase implements
-JobConfigurable {
-private final Log LOG = LogFactory.getLog(MatrixInputFormat.class);
+ JobConfigurable {
+ private final Log LOG = LogFactory.getLog(MatrixInputFormat.class);
-/**
-* space delimited list of columns
-*
-* @see org.apache.hadoop.hbase.regionserver.HAbstractScanner for column name
-* wildcards
-*/
-public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+ /**
+ * space delimited list of columns
+ *
+ * @see org.apache.hadoop.hbase.regionserver.HAbstractScanner for column name
+ * wildcards
+ */
+ public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
-/** [EMAIL PROTECTED] */
-public void configure(JobConf job) {
-Path[] tableNames = FileInputFormat.getInputPaths(job);
-String colArg = job.get(COLUMN_LIST);
-String[] colNames = colArg.split(" ");
-byte [][] m_cols = new byte[colNames.length][];
-for (int i = 0; i < m_cols.length; i++) {
- m_cols[i] = Bytes.toBytes(colNames[i]);
-}
-setInputColums(m_cols);
-try {
- setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
-} catch (Exception e) {
- LOG.error(e);
-}
-}
+ /** [EMAIL PROTECTED] */
+ public void configure(JobConf job) {
+ Path[] tableNames = FileInputFormat.getInputPaths(job);
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ byte[][] m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+ setInputColums(m_cols);
+ try {
+ setHTable(new HTable(new HBaseConfiguration(job),
tableNames[0].getName()));
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
-/** [EMAIL PROTECTED] */
-public void validateInput(JobConf job) throws IOException {
-// expecting exactly one path
-Path [] tableNames = FileInputFormat.getInputPaths(job);
-if (tableNames == null || tableNames.length > 1) {
- throw new IOException("expecting one table name");
-}
+ /** [EMAIL PROTECTED] */
+ public void validateInput(JobConf job) throws IOException {
+ // expecting exactly one path
+ Path[] tableNames = FileInputFormat.getInputPaths(job);
+ if (tableNames == null || tableNames.length > 1) {
+ throw new IOException("expecting one table name");
+ }
-// expecting at least one column
-String colArg = job.get(COLUMN_LIST);
-if (colArg == null || colArg.length() == 0) {
- throw new IOException("expecting at least one column");
-}
-}
+ // expecting at least one column
+ String colArg = job.get(COLUMN_LIST);
+ if (colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+ }
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java?rev=681591&r1=681590&r2=681591&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java
(original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java Thu Jul
31 19:43:10 2008
@@ -2,8 +2,9 @@
import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -12,43 +13,31 @@
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.Matrix;
import org.apache.hama.io.VectorDatum;
@SuppressWarnings("unchecked")
public abstract class MatrixMap<K extends WritableComparable, V extends
Writable>
- extends MapReduceBase implements Mapper<ImmutableBytesWritable,
VectorDatum, K, V> {
- /**
- * Use this before submitting a TableMap job. It will
- * appropriately set up the JobConf.
- *
- * @param table table name
- * @param columns columns to scan
- * @param mapper mapper class
- * @param job job configuration
- */
- public static void initJob(String table, String columns,
- Class<? extends MatrixMap> mapper,
- Class<? extends WritableComparable> outputKeyClass,
- Class<? extends Writable> outputValueClass, JobConf job) {
-
+ extends MapReduceBase implements
+ Mapper<ImmutableBytesWritable, VectorDatum, K, V> {
+ protected static Matrix B;
+
+ public static void initJob(String matrixA, String matrixB,
+ Class<? extends MatrixMap> mapper,
+ Class<? extends WritableComparable> outputKeyClass,
+ Class<? extends Writable> outputValueClass, JobConf job) {
+
job.setInputFormat(MatrixInputFormat.class);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
- FileInputFormat.addInputPaths(job, table);
- job.set(TableInputFormat.COLUMN_LIST, columns);
+ FileInputFormat.addInputPaths(job, matrixA);
+
+ B = new Matrix(new HBaseConfiguration(), new Text(matrixB));
+ job.set(MatrixInputFormat.COLUMN_LIST, Constants.COLUMN);
}
- /**
- * Call a user defined function on a single HBase record, represented
- * by a key and its associated record value.
- *
- * @param key
- * @param value
- * @param output
- * @param reporter
- * @throws IOException
- */
public abstract void map(ImmutableBytesWritable key, VectorDatum value,
OutputCollector<K, V> output, Reporter reporter) throws IOException;
}