Author: millecker
Date: Wed Sep 25 18:28:31 2013
New Revision: 1526259
URL: http://svn.apache.org/r1526259
Log:
HAMA-805: Problem initializing pipes in HamaStreaming
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1526259&r1=1526258&r2=1526259&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Sep 25 18:28:31 2013
@@ -10,7 +10,8 @@ Release 0.6.3 (unreleased changes)
HAMA-801: Snappy fails on Mac OS with JDK 1.7 (Anastasis Andronidis via
tommaso)
BUG FIXES
-
+
+ HAMA-805: Problem initializing pipes in HamaStreaming (Martin Illecker)
HAMA-789: BspPeer launched fail because port is bound by others (Suraj
Menon via edwardyoon)
HAMA-791: Fix the problem that MultilayerPerceptron fails to learn a good
hypothesis sometimes. (Yexi Jiang)
HAMA-782: The arguments of DoubleVector.slice(int, int) method will mislead
the user. (Yexi Jiang)
Modified:
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1526259&r1=1526258&r2=1526259&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
Wed Sep 25 18:28:31 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.pipes.Submitter;
+import org.apache.hama.pipes.protocol.UplinkReader;
/**
* This protocol is a binary implementation of the Hama Pipes protocol.
@@ -77,7 +78,8 @@ public class BinaryProtocol<K1 extends W
* messages are public methods on this object.
*
* @param jobConfig The job's configuration
- * @param sock The socket to communicate on.
+ * @param out The output stream to communicate on.
+ * @param in The input stream to communicate on.
* @throws IOException
*/
public BinaryProtocol(Configuration conf, OutputStream out, InputStream in)
@@ -116,12 +118,17 @@ public class BinaryProtocol<K1 extends W
out = new TeeOutputStream("downlink.data", out);
}
stream = new DataOutputStream(new BufferedOutputStream(out, BUFFER_SIZE));
- uplink = new UplinkReader<K1, V1, K2, V2>(this, peer, in);
+ uplink = getUplinkReader(peer, in);
uplink.setName("pipe-uplink-handler");
uplink.start();
}
+ public UplinkReader<K1, V1, K2, V2> getUplinkReader(
+ BSPPeer<K1, V1, K2, V2, BytesWritable> peer, InputStream in) throws
IOException {
+ return new UplinkReader<K1, V1, K2, V2>(this, peer, in);
+ }
+
public boolean isHasTask() {
return hasTask;
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1526259&r1=1526258&r2=1526259&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
Wed Sep 25 18:28:31 2013
@@ -35,6 +35,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.pipes.protocol.UplinkReader;
+import
org.apache.hama.pipes.protocol.StreamingProtocol.StreamingUplinkReaderThread;
import org.apache.hama.util.KeyValuePair;
/**
@@ -61,6 +63,13 @@ public class StreamingProtocol<K1 extend
super(peer, out, in);
}
+ @Override
+ public UplinkReader<K1, V1, Text, Text> getUplinkReader(
+ BSPPeer<K1, V1, Text, Text, BytesWritable> peer, InputStream in)
+ throws IOException {
+ return new StreamingUplinkReaderThread(peer, in);
+ }
+
public class StreamingUplinkReaderThread extends
UplinkReader<K1, V1, Text, Text> {
@@ -265,12 +274,6 @@ public class StreamingProtocol<K1 extend
waitOnAck();
}
- /*
- * @Override public UplinkReaderThread getUplinkReader( BSPPeer<K1, V1, Text,
- * Text, BytesWritable> peer, InputStream in) throws IOException { return new
- * StreamingUplinkReaderThread(peer, in); }
- */
-
public void writeLine(int msg) throws IOException {
writeLine("" + msg);
}