Author: edwardyoon
Date: Mon Sep 8 22:01:40 2008
New Revision: 693363
URL: http://svn.apache.org/viewvc?rev=693363&view=rev
Log: (empty)
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
Modified: incubator/hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=693363&r1=693362&r2=693363&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Sep 8 22:01:40 2008
@@ -48,6 +48,7 @@
BUG FIXES
+ HAMA-54: Split doesn't split by map task num (edwardyoon)
HAMA-53: NullPointerException on distributed cluster (edwardyoon)
HAMA-26: hama-formatter.xml should be removed (edwardyoon)
HAMA-25: Vector.get() returns double (edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=693363&r1=693362&r2=693363&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Mon Sep
8 22:01:40 2008
@@ -92,13 +92,13 @@
public int getRows() throws IOException {
Cell rows = null;
rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
- return Bytes.toInt(rows.getValue());
+ return Numeric.bytesToInt(rows.getValue());
}
/** [EMAIL PROTECTED] */
public int getColumns() throws IOException {
Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS);
- return Bytes.toInt(columns.getValue());
+ return Numeric.bytesToInt(columns.getValue());
}
/** [EMAIL PROTECTED] */
@@ -117,8 +117,8 @@
/** [EMAIL PROTECTED] */
public void setDimension(int rows, int columns) throws IOException {
BatchUpdate b = new BatchUpdate(Constants.METADATA);
- b.put(Constants.METADATA_ROWS, Bytes.toBytes(rows));
- b.put(Constants.METADATA_COLUMNS, Bytes.toBytes(columns));
+ b.put(Constants.METADATA_ROWS, Numeric.intToBytes(rows));
+ b.put(Constants.METADATA_COLUMNS, Numeric.intToBytes(columns));
table.commit(b);
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=693363&r1=693362&r2=693363&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Mon Sep 8
22:01:40 2008
@@ -137,6 +137,7 @@
JobConf jobConf = new JobConf(config);
jobConf.setJobName("addition MR job");
+ jobConf.setNumMapTasks(2);
AdditionMap.initJob(this.getName(), B.getName(), AdditionMap.class,
IntWritable.class, DenseVector.class, jobConf);
Modified:
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java?rev=693363&r1=693362&r2=693363&view=diff
==============================================================================
---
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
(original)
+++
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
Mon Sep 8 22:01:40 2008
@@ -23,14 +23,13 @@
import java.util.HashSet;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.filter.RowFilterSet;
import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -41,12 +40,12 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
import org.apache.hama.DenseVector;
import org.apache.hama.util.Numeric;
public abstract class MatrixInputFormatBase implements
InputFormat<IntWritable, DenseVector> {
- private final Log LOG = LogFactory.getLog(MatrixInputFormatBase.class);
private byte[][] inputColumns;
private HTable table;
private TableRecordReader tableRecordReader;
@@ -167,13 +166,12 @@
* @param key HStoreKey as input key.
* @param value MapWritable as input value
*
- * Converts Scanner.next() to Text, DenseVector
+ * Converts Scanner.next() to Text, DenseVector
*
* @return true if there was more data
* @throws IOException
*/
- public boolean next(IntWritable key, DenseVector value)
- throws IOException {
+ public boolean next(IntWritable key, DenseVector value) throws IOException
{
RowResult result = this.scanner.next();
boolean hasMore = result != null && result.size() > 0;
if (hasMore) {
@@ -208,57 +206,27 @@
return trr;
}
- /**
- * Calculates the splits that will serve as input for the map tasks.
- * <ul>
- * Splits are created in number equal to the smallest between numSplits and
- * the number of [EMAIL PROTECTED] HRegion}s in the table. If the number of
splits is
- * smaller than the number of [EMAIL PROTECTED] HRegion}s then splits are
spanned across
- * multiple [EMAIL PROTECTED] HRegion}s and are grouped the most evenly
possible. In the
- * case splits are uneven the bigger splits are placed first in the
- * [EMAIL PROTECTED] InputSplit} array.
- *
- * @param job the map task [EMAIL PROTECTED] JobConf}
- * @param numSplits a hint to calculate the number of splits
- *
- * @return the input splits
- *
- * @see
org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf,
- * int)
- */
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
- byte[][] startKeys = null;
- try {
- startKeys = this.table.getStartKeys();
- } catch (NullPointerException ne) {}
-
- if (startKeys == null || startKeys.length == 0) {
- throw new IOException("Expecting at least one region");
- }
- if (this.table == null) {
- throw new IOException("No table was provided");
- }
- if (this.inputColumns == null || this.inputColumns.length == 0) {
- throw new IOException("Expecting at least one column");
- }
- int realNumSplits = numSplits > startKeys.length ? startKeys.length
- : numSplits;
- InputSplit[] splits = new InputSplit[realNumSplits];
- int middle = startKeys.length / realNumSplits;
- int startPos = 0;
- for (int i = 0; i < realNumSplits; i++) {
- int lastPos = startPos + middle;
- lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
- splits[i] = new TableSplit(this.table.getTableName(),
- startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
- : HConstants.EMPTY_START_ROW);
- if (LOG.isDebugEnabled()) {
- LOG.debug("split: " + i + "->" + splits[i]);
- }
- startPos = lastPos;
+ Cell meta = this.table.get(Constants.METADATA, Constants.METADATA_ROWS);
+
+ if (Numeric.bytesToInt(meta.getValue()) < numSplits) {
+ numSplits = Numeric.bytesToInt(meta.getValue());
+ }
+
+ int[] startKeys = new int[numSplits];
+ int interval = Numeric.bytesToInt(meta.getValue()) / numSplits;
+
+ for (int i = 0; i < numSplits; i++) {
+ startKeys[i] = (i * interval);
}
- return splits;
+ InputSplit[] splits = new InputSplit[startKeys.length];
+ for (int i = 0; i < startKeys.length; i++) {
+ splits[i] = new TableSplit(this.table.getTableName(),
+ Numeric.intToBytes(startKeys[i]), ((i + 1) < startKeys.length) ?
+ Numeric.intToBytes(startKeys[i + 1]) :
HConstants.EMPTY_START_ROW);
+ }
+ return splits;
}
/**
@@ -281,7 +249,7 @@
* Allows subclasses to set the [EMAIL PROTECTED] TableRecordReader}.
*
* @param tableRecordReader to provide other [EMAIL PROTECTED]
TableRecordReader}
- * implementations.
+ * implementations.
*/
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader;
Modified:
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=693363&r1=693362&r2=693363&view=diff
==============================================================================
---
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
(original)
+++
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
Mon Sep 8 22:01:40 2008
@@ -50,11 +50,13 @@
Matrix matrixA = new DenseMatrix(conf, A);
matrixA.set(0, 0, 1);
matrixA.set(0, 1, 0);
+ matrixA.setDimension(1, 2);
Matrix matrixB = new DenseMatrix(conf, B);
matrixB.set(0, 0, 1);
matrixB.set(0, 1, 1);
-
+ matrixB.setDimension(1, 2);
+
miniMRJob();
}