Author: tommaso
Date: Fri Oct 26 06:03:26 2012
New Revision: 1402402
URL: http://svn.apache.org/viewvc?rev=1402402&view=rev
Log:
[HAMA-651] - fixed indent
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormat.java
hama/trunk/ml/src/test/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormatTest.java
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java?rev=1402402&r1=1402401&r2=1402402&view=diff
==============================================================================
---
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
(original)
+++
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
Fri Oct 26 06:03:26 2012
@@ -99,7 +99,7 @@ public class GradientDescentBSP extends
peer.sync();
// second superstep : aggregate cost calculation
- double totalCost = localCost;
+ double totalCost = localCost;
VectorWritable costResult;
while ((costResult = peer.getCurrentMessage()) != null) {
totalCost += costResult.getVector().get(0);
@@ -111,14 +111,14 @@ public class GradientDescentBSP extends
// cost check
if (cost - totalCost < 0) {
throw new RuntimeException(new StringBuilder("gradient descent failed
to converge with alpha ").
- append(alpha).toString());
+ append(alpha).toString());
} else if (totalCost == 0 || totalCost < costThreshold || iterations >=
iterationsThreshold) {
cost = totalCost;
break;
} else {
cost = totalCost;
if (log.isDebugEnabled()) {
- log.debug(peer.getPeerName()+": cost is " + cost);
+ log.debug(peer.getPeerName() + ": cost is " + cost);
}
}
@@ -161,7 +161,7 @@ public class GradientDescentBSP extends
if (log.isDebugEnabled()) {
log.debug(new StringBuilder(peer.getPeerName()).append(": new theta
for cost ").
- append(cost).append(" is
").append(theta.toString()).toString());
+ append(cost).append(" is ").append(theta.toString()).toString());
}
// master writes down the output
if (master) {
@@ -179,34 +179,34 @@ public class GradientDescentBSP extends
public void cleanup(BSPPeer<VectorWritable, DoubleWritable, VectorWritable,
DoubleWritable, VectorWritable> peer) throws IOException {
// master writes down the final output
if (master) {
- peer.write(new VectorWritable(theta), new DoubleWritable(cost));
- if (log.isInfoEnabled()) {
- log.info(new
StringBuilder(peer.getPeerName()).append(":computation finished with cost ").
- append(cost).append(" for theta
").append(theta).toString());
- }
+ peer.write(new VectorWritable(theta), new DoubleWritable(cost));
+ if (log.isInfoEnabled()) {
+ log.info(new StringBuilder(peer.getPeerName()).append(":computation
finished with cost ").
+ append(cost).append(" for theta ").append(theta).toString());
+ }
}
}
public void getTheta(BSPPeer<VectorWritable, DoubleWritable, VectorWritable,
DoubleWritable, VectorWritable> peer) throws IOException, SyncException,
InterruptedException {
if (theta == null) {
- if (master) {
- int size = getXSize(peer);
- theta = new DenseDoubleVector(size,
peer.getConfiguration().getInt(INITIAL_THETA_VALUES, 10));
- for (String peerName : peer.getAllPeerNames()) {
- peer.send(peerName, new VectorWritable(theta));
- }
- if (log.isDebugEnabled()) {
- log.debug(new StringBuilder(peer.getPeerName()).append(":
sending theta").toString());
- }
- peer.sync();
- } else {
- if (log.isDebugEnabled()) {
- log.debug(new StringBuilder(peer.getPeerName()).append(":
getting theta").toString());
- }
- peer.sync();
- VectorWritable vectorWritable = peer.getCurrentMessage();
- theta = vectorWritable.getVector();
+ if (master) {
+ int size = getXSize(peer);
+ theta = new DenseDoubleVector(size,
peer.getConfiguration().getInt(INITIAL_THETA_VALUES, 10));
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new VectorWritable(theta));
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(new StringBuilder(peer.getPeerName()).append(": sending
theta").toString());
}
+ peer.sync();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(new StringBuilder(peer.getPeerName()).append(": getting
theta").toString());
+ }
+ peer.sync();
+ VectorWritable vectorWritable = peer.getCurrentMessage();
+ theta = vectorWritable.getVector();
+ }
}
}
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormat.java?rev=1402402&r1=1402401&r2=1402402&view=diff
==============================================================================
---
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormat.java
(original)
+++
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormat.java
Fri Oct 26 06:03:26 2012
@@ -17,9 +17,6 @@
*/
package org.apache.hama.ml.regression;
-import java.io.IOException;
-import java.io.InputStream;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -30,185 +27,184 @@ import org.apache.hadoop.io.DoubleWritab
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.FileInputFormat;
-import org.apache.hama.bsp.FileSplit;
-import org.apache.hama.bsp.InputSplit;
-import org.apache.hama.bsp.RecordReader;
+import org.apache.hama.bsp.*;
import org.apache.hama.ml.math.DenseDoubleVector;
import org.apache.hama.ml.math.DoubleVector;
import org.apache.hama.ml.writable.VectorWritable;
+import java.io.IOException;
+import java.io.InputStream;
+
/**
* A {@link FileInputFormat} for files containing one vector and one double
per line
*/
public class VectorDoubleFileInputFormat extends
FileInputFormat<VectorWritable, DoubleWritable> {
- @Override
- public RecordReader<VectorWritable, DoubleWritable>
getRecordReader(InputSplit split, BSPJob job) throws IOException {
- return new VectorDoubleRecorderReader(job.getConf(), (FileSplit)
split);
+ @Override
+ public RecordReader<VectorWritable, DoubleWritable>
getRecordReader(InputSplit split, BSPJob job) throws IOException {
+ return new VectorDoubleRecorderReader(job.getConf(), (FileSplit) split);
+ }
+
+ static class VectorDoubleRecorderReader implements
RecordReader<VectorWritable, DoubleWritable> {
+
+ private static final Log LOG =
LogFactory.getLog(VectorDoubleRecorderReader.class
+ .getName());
+
+ private CompressionCodecFactory compressionCodecs = null;
+ private long start;
+ private long pos;
+ private long end;
+ private LineReader in;
+ int maxLineLength;
+
+ /**
+ * A class that provides a line reader from an input stream.
+ */
+ public static class LineReader extends org.apache.hadoop.util.LineReader {
+ LineReader(InputStream in) {
+ super(in);
+ }
+
+ LineReader(InputStream in, int bufferSize) {
+ super(in, bufferSize);
+ }
+
+ public LineReader(InputStream in, Configuration conf) throws IOException
{
+ super(in, conf);
+ }
+ }
+
+ public VectorDoubleRecorderReader(Configuration job, FileSplit split)
+ throws IOException {
+ this.maxLineLength = job.getInt("bsp.linerecordreader.maxlength",
+ Integer.MAX_VALUE);
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+ compressionCodecs = new CompressionCodecFactory(job);
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
+
+ // open the file and seek to the start of the split
+ FileSystem fs = file.getFileSystem(job);
+ FSDataInputStream fileIn = fs.open(split.getPath());
+ boolean skipFirstLine = false;
+ if (codec != null) {
+ in = new LineReader(codec.createInputStream(fileIn), job);
+ end = Long.MAX_VALUE;
+ } else {
+ if (start != 0) {
+ skipFirstLine = true;
+ --start;
+ fileIn.seek(start);
+ }
+ in = new LineReader(fileIn, job);
+ }
+ if (skipFirstLine) { // skip first line and re-establish "start".
+ start += in.readLine(new Text(), 0,
+ (int) Math.min(Integer.MAX_VALUE, end - start));
+ }
+ this.pos = start;
}
- static class VectorDoubleRecorderReader implements
RecordReader<VectorWritable, DoubleWritable> {
+ public VectorDoubleRecorderReader(InputStream in, long offset, long
endOffset,
+ int maxLineLength) {
+ this.maxLineLength = maxLineLength;
+ this.in = new LineReader(in);
+ this.start = offset;
+ this.pos = offset;
+ this.end = endOffset;
+ }
- private static final Log LOG =
LogFactory.getLog(VectorDoubleRecorderReader.class
- .getName());
+ public VectorDoubleRecorderReader(InputStream in, long offset, long
endOffset,
+ Configuration job) throws IOException {
+ this.maxLineLength = job.getInt("bsp.linerecordreader.maxlength",
+ Integer.MAX_VALUE);
+ this.in = new LineReader(in, job);
+ this.start = offset;
+ this.pos = offset;
+ this.end = endOffset;
+ }
- private CompressionCodecFactory compressionCodecs = null;
- private long start;
- private long pos;
- private long end;
- private LineReader in;
- int maxLineLength;
-
- /**
- * A class that provides a line reader from an input stream.
- */
- public static class LineReader extends
org.apache.hadoop.util.LineReader {
- LineReader(InputStream in) {
- super(in);
- }
-
- LineReader(InputStream in, int bufferSize) {
- super(in, bufferSize);
- }
-
- public LineReader(InputStream in, Configuration conf) throws
IOException {
- super(in, conf);
- }
- }
+ @Override
+ public VectorWritable createKey() {
+ return new VectorWritable();
+ }
- public VectorDoubleRecorderReader(Configuration job, FileSplit split)
- throws IOException {
- this.maxLineLength = job.getInt("bsp.linerecordreader.maxlength",
- Integer.MAX_VALUE);
- start = split.getStart();
- end = start + split.getLength();
- final Path file = split.getPath();
- compressionCodecs = new CompressionCodecFactory(job);
- final CompressionCodec codec = compressionCodecs.getCodec(file);
-
- // open the file and seek to the start of the split
- FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
- boolean skipFirstLine = false;
- if (codec != null) {
- in = new LineReader(codec.createInputStream(fileIn), job);
- end = Long.MAX_VALUE;
- } else {
- if (start != 0) {
- skipFirstLine = true;
- --start;
- fileIn.seek(start);
- }
- in = new LineReader(fileIn, job);
- }
- if (skipFirstLine) { // skip first line and re-establish "start".
- start += in.readLine(new Text(), 0,
- (int) Math.min(Integer.MAX_VALUE, end - start));
- }
- this.pos = start;
- }
+ @Override
+ public DoubleWritable createValue() {
+ return new DoubleWritable();
+ }
+
+ /**
+ * Read a line.
+ */
+ @Override
+ public synchronized boolean next(VectorWritable key, DoubleWritable value)
+ throws IOException {
+
+ while (pos < end) {
- public VectorDoubleRecorderReader(InputStream in, long offset, long
endOffset,
- int maxLineLength) {
- this.maxLineLength = maxLineLength;
- this.in = new LineReader(in);
- this.start = offset;
- this.pos = offset;
- this.end = endOffset;
+ Text textVal = new Text();
+ int newSize = in.readLine(textVal, maxLineLength, Math.max(
+ (int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
+ if (newSize == 0) {
+ return false;
}
- public VectorDoubleRecorderReader(InputStream in, long offset, long
endOffset,
- Configuration job) throws
IOException {
- this.maxLineLength = job.getInt("bsp.linerecordreader.maxlength",
- Integer.MAX_VALUE);
- this.in = new LineReader(in, job);
- this.start = offset;
- this.pos = offset;
- this.end = endOffset;
+ String[] kv = new String(textVal.getBytes()).split("\\>");
+ if (kv.length != 2) {
+ throw new IOException("a line was not parsed correctly");
}
+ value.set(Double.valueOf(kv[0]));
+ key.set(toDoubleVector(kv[1]));
- @Override
- public VectorWritable createKey() {
- return new VectorWritable();
+ if (LOG.isDebugEnabled()) {
+ LOG.info("reading " + kv[1] + ":" + kv[0]);
}
- @Override
- public DoubleWritable createValue() {
- return new DoubleWritable();
+ pos += newSize;
+ if (newSize < maxLineLength) {
+ return true;
}
- /**
- * Read a line.
- */
- @Override
- public synchronized boolean next(VectorWritable key, DoubleWritable
value)
- throws IOException {
-
- while (pos < end) {
-
- Text textVal = new Text();
- int newSize = in.readLine(textVal, maxLineLength, Math.max(
- (int) Math.min(Integer.MAX_VALUE, end - pos),
maxLineLength));
- if (newSize == 0) {
- return false;
- }
-
- String[] kv = new String(textVal.getBytes()).split("\\>");
- if (kv.length != 2) {
- throw new IOException("a line was not parsed correctly");
- }
- value.set(Double.valueOf(kv[0]));
- key.set(toDoubleVector(kv[1]));
-
- if (LOG.isDebugEnabled()) {
- LOG.info("reading "+kv[1]+":"+kv[0]);
- }
-
- pos += newSize;
- if (newSize < maxLineLength) {
- return true;
- }
-
- // line too long. try again
- LOG.info("Skipped line of size " + newSize + " at pos " + (pos
- newSize));
- }
+ // line too long. try again
+ LOG.info("Skipped line of size " + newSize + " at pos " + (pos -
newSize));
+ }
- return false;
- }
+ return false;
+ }
- private DoubleVector toDoubleVector(String s) {
- String[] split = s.split(" ");
- double[] dar = new double[split.length];
- for (int i = 0; i < split.length; i++) {
- dar[i] = Double.valueOf(split[i]);
- }
- return new DenseDoubleVector(dar);
- }
+ private DoubleVector toDoubleVector(String s) {
+ String[] split = s.split(" ");
+ double[] dar = new double[split.length];
+ for (int i = 0; i < split.length; i++) {
+ dar[i] = Double.valueOf(split[i]);
+ }
+ return new DenseDoubleVector(dar);
+ }
- /**
- * Get the progress within the split
- */
- @Override
- public float getProgress() {
- if (start == end) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (pos - start) / (float) (end - start));
- }
- }
+ /**
+ * Get the progress within the split
+ */
+ @Override
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (pos - start) / (float) (end - start));
+ }
+ }
- @Override
- public synchronized long getPos() throws IOException {
- return pos;
- }
+ @Override
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
- @Override
- public synchronized void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
+ @Override
+ public synchronized void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
}
+ }
}
Modified:
hama/trunk/ml/src/test/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormatTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormatTest.java?rev=1402402&r1=1402401&r2=1402402&view=diff
==============================================================================
---
hama/trunk/ml/src/test/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormatTest.java
(original)
+++
hama/trunk/ml/src/test/java/org/apache/hama/ml/regression/VectorDoubleFileInputFormatTest.java
Fri Oct 26 06:03:26 2012
@@ -27,9 +27,7 @@ import org.apache.hama.ml.math.DenseDoub
import org.apache.hama.ml.writable.VectorWritable;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* Testcase for {@link VectorDoubleFileInputFormat}
@@ -42,7 +40,7 @@ public class VectorDoubleFileInputFormat
Path file = new Path("src/test/resources/vd_file_sample.txt");
InputSplit split = new FileSplit(file, 0, 1000, new String[]{"localhost"});
BSPJob job = new BSPJob();
- RecordReader<VectorWritable,DoubleWritable> recordReader =
inputFormat.getRecordReader(split, job);
+ RecordReader<VectorWritable, DoubleWritable> recordReader =
inputFormat.getRecordReader(split, job);
assertNotNull(recordReader);
VectorWritable key = recordReader.createKey();
assertNotNull(key);