Author: tommaso
Date: Fri Oct 5 20:39:26 2012
New Revision: 1394791
URL: http://svn.apache.org/viewvc?rev=1394791&view=rev
Log:
[HAMA-651] - fixed format and added input reset on #getXSize
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java?rev=1394791&r1=1394790&r2=1394791&view=diff
==============================================================================
---
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
(original)
+++
hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
Fri Oct 5 20:39:26 2012
@@ -17,8 +17,6 @@
*/
package org.apache.hama.ml.regression;
-import java.io.IOException;
-
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hama.bsp.BSP;
@@ -31,155 +29,158 @@ import org.apache.hama.util.KeyValuePair
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
/**
* A gradient descent (see
<code>http://en.wikipedia.org/wiki/Gradient_descent</code>) BSP based abstract
implementation.
* Each extending class should implement the #hypothesis(DoubleVector theta,
DoubleVector x) method for a specific
*/
public abstract class GradientDescentBSP extends BSP<VectorWritable,
DoubleWritable, NullWritable, NullWritable, VectorWritable> {
- private static final Logger log =
LoggerFactory.getLogger(GradientDescentBSP.class);
- static final String INITIAL_THETA_VALUES = "initial.theta.values";
- static final String ALPHA = "alpha";
-
- private boolean master;
- private DoubleVector theta;
-
- @Override
- public void setup(BSPPeer<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> peer) throws IOException, SyncException,
InterruptedException {
- master = peer.getPeerIndex() == peer.getNumPeers() / 2;
- }
+ private static final Logger log =
LoggerFactory.getLogger(GradientDescentBSP.class);
+ static final String INITIAL_THETA_VALUES = "initial.theta.values";
+ static final String ALPHA = "alpha";
+
+ private boolean master;
+ private DoubleVector theta;
- @Override
- public void bsp(BSPPeer<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> peer) throws IOException, SyncException,
InterruptedException {
+ @Override
+ public void setup(BSPPeer<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> peer) throws IOException, SyncException,
InterruptedException {
+ master = peer.getPeerIndex() == peer.getNumPeers() / 2;
+ }
- while (true) {
+ @Override
+ public void bsp(BSPPeer<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> peer) throws IOException, SyncException,
InterruptedException {
- getTheta(peer);
+ while (true) {
- // first superstep : calculate cost function in parallel
+ getTheta(peer);
- double localCost = 0d;
+ // first superstep : calculate cost function in parallel
- int numRead = 0;
+ double localCost = 0d;
- // read an input
- KeyValuePair<VectorWritable, DoubleWritable> kvp;
- while ((kvp = peer.readNext()) != null) {
+ int numRead = 0;
- // calculate cost for given input
- double y = kvp.getValue().get();
- DoubleVector x = kvp.getKey().getVector();
- double costForX = y * Math.log(hypothesis(theta, x)) + (1 - y)
* Math.log(1 - hypothesis(theta, x));
+ // read an input
+ KeyValuePair<VectorWritable, DoubleWritable> kvp;
+ while ((kvp = peer.readNext()) != null) {
- // adds to local cost
- localCost += costForX;
- numRead++;
- }
+ // calculate cost for given input
+ double y = kvp.getValue().get();
+ DoubleVector x = kvp.getKey().getVector();
+ double costForX = y * Math.log(hypothesis(theta, x)) + (1 - y) *
Math.log(1 - hypothesis(theta, x));
- // cost is sent and aggregated by each
- double totalCost = localCost;
+ // adds to local cost
+ localCost += costForX;
+ numRead++;
+ }
- for (String peerName : peer.getAllPeerNames()) {
- peer.send(peerName, new VectorWritable(new
DenseDoubleVector(new double[]{localCost, numRead})));
- }
- peer.sync();
+ // cost is sent and aggregated by each
+ double totalCost = localCost;
- // second superstep : cost calculation
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new VectorWritable(new DenseDoubleVector(new
double[]{localCost, numRead})));
+ }
+ peer.sync();
- VectorWritable costResult;
- while ((costResult = peer.getCurrentMessage()) != null) {
- totalCost += costResult.getVector().get(0);
- numRead += costResult.getVector().get(1);
- }
+ // second superstep : cost calculation
- totalCost = totalCost * (-1 / numRead);
- if (log.isInfoEnabled()) {
- log.info("cost is " + totalCost);
- }
+ VectorWritable costResult;
+ while ((costResult = peer.getCurrentMessage()) != null) {
+ totalCost += costResult.getVector().get(0);
+ numRead += costResult.getVector().get(1);
+ }
- peer.sync();
+ totalCost = totalCost * (-1 / numRead);
+ if (log.isInfoEnabled()) {
+ log.info("cost is " + totalCost);
+ }
- peer.reopenInput();
+ peer.sync();
- double[] thetaDelta = new double[theta.getLength()];
+ peer.reopenInput();
- // second superstep : calculate partial derivatives in parallel
- while ((kvp = peer.readNext()) != null) {
- DoubleVector x = kvp.getKey().getVector();
- double y = kvp.getValue().get();
- double difference = hypothesis(theta, x) - y;
- for (int j = 0; j < theta.getLength(); j++) {
- thetaDelta[j] += difference * x.get(j);
- }
- }
+ double[] thetaDelta = new double[theta.getLength()];
- // send thetaDelta to the each peer
- for (String peerName : peer.getAllPeerNames()) {
- peer.send(peerName, new VectorWritable(new
DenseDoubleVector(thetaDelta)));
- }
+ // second superstep : calculate partial derivatives in parallel
+ while ((kvp = peer.readNext()) != null) {
+ DoubleVector x = kvp.getKey().getVector();
+ double y = kvp.getValue().get();
+ double difference = hypothesis(theta, x) - y;
+ for (int j = 0; j < theta.getLength(); j++) {
+ thetaDelta[j] += difference * x.get(j);
+ }
+ }
- peer.sync();
+ // send thetaDelta to the each peer
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new VectorWritable(new
DenseDoubleVector(thetaDelta)));
+ }
- VectorWritable thetaDeltaSlice;
- while ((thetaDeltaSlice = peer.getCurrentMessage()) != null) {
- double[] newTheta = new double[theta.getLength()];
+ peer.sync();
- for (int j = 0; j < theta.getLength(); j++) {
- newTheta[j] += thetaDeltaSlice.getVector().get(j);
- }
+ VectorWritable thetaDeltaSlice;
+ while ((thetaDeltaSlice = peer.getCurrentMessage()) != null) {
+ double[] newTheta = new double[theta.getLength()];
- for (int j = 0; j < theta.getLength(); j++) {
- newTheta[j] = theta.get(j) - newTheta[j] *
peer.getConfiguration().getFloat(ALPHA, 0.3f);
- }
+ for (int j = 0; j < theta.getLength(); j++) {
+ newTheta[j] += thetaDeltaSlice.getVector().get(j);
+ }
- theta = new DenseDoubleVector(newTheta);
+ for (int j = 0; j < theta.getLength(); j++) {
+ newTheta[j] = theta.get(j) - newTheta[j] *
peer.getConfiguration().getFloat(ALPHA, 0.3f);
+ }
- if (log.isInfoEnabled()) {
- log.info("new theta for cost " + totalCost + " is " +
theta.toArray().toString());
- }
- }
- peer.sync();
+ theta = new DenseDoubleVector(newTheta);
- // eventually break execution !?
- if (totalCost == 0) {
- // TODO change this as just 0 is too strict
- break;
- }
+ if (log.isInfoEnabled()) {
+ log.info("new theta for cost " + totalCost + " is " +
theta.toArray().toString());
}
+ }
+ peer.sync();
+ // eventually break execution !?
+ if (totalCost == 0) {
+ // TODO change this as just 0 is too strict
+ break;
+ }
}
- /**
- * Applies the hypothesis given a set of parameters theta to a given input
x
- *
- * @param theta the parameters vector
- * @param x the input
- * @return a <code>double</code> number
- */
- public abstract double hypothesis(DoubleVector theta, DoubleVector x);
-
-
- public void getTheta(BSPPeer<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> peer) throws IOException, SyncException,
InterruptedException {
- if (master && theta == null) {
- int size = getXSize(peer);
- theta = new DenseDoubleVector(size,
peer.getConfiguration().getInt(INITIAL_THETA_VALUES, 10));
- for (String peerName : peer.getAllPeerNames()) {
- peer.send(peerName, new VectorWritable(theta));
- }
- peer.sync();
- } else {
- peer.sync();
- VectorWritable vectorWritable = peer.getCurrentMessage();
- theta = vectorWritable.getVector();
- }
+ }
+
+ /**
+ * Applies the hypothesis given a set of parameters theta to a given input x
+ *
+ * @param theta the parameters vector
+ * @param x the input
+ * @return a <code>double</code> number
+ */
+ public abstract double hypothesis(DoubleVector theta, DoubleVector x);
+
+
+ public void getTheta(BSPPeer<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> peer) throws IOException, SyncException,
InterruptedException {
+ if (master && theta == null) {
+ int size = getXSize(peer);
+ theta = new DenseDoubleVector(size,
peer.getConfiguration().getInt(INITIAL_THETA_VALUES, 10));
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new VectorWritable(theta));
+ }
+ peer.sync();
+ } else {
+ peer.sync();
+ VectorWritable vectorWritable = peer.getCurrentMessage();
+ theta = vectorWritable.getVector();
}
+ }
- private int getXSize(BSPPeer<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> peer) throws IOException {
- VectorWritable key = null;
- peer.readNext(key, null);
- if (key == null) {
- throw new IOException("cannot read input vector size");
- }
- return key.getVector().getLength();
+ private int getXSize(BSPPeer<VectorWritable, DoubleWritable, NullWritable,
NullWritable, VectorWritable> peer) throws IOException {
+ VectorWritable key = null;
+ peer.readNext(key, null);
+ peer.reopenInput(); // reset input to start
+ if (key == null) {
+ throw new IOException("cannot read input vector size");
}
+ return key.getVector().getLength();
+ }
}