Author: cutting Date: Mon Nov 20 16:25:44 2006 New Revision: 477433 URL: http://svn.apache.org/viewvc?view=rev&rev=477433 Log: HADOOP-677. In IPC, permit a version header to be transmitted when connections are established. Contributed by Owen.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=477433&r1=477432&r2=477433 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Nov 20 16:25:44 2006 @@ -106,6 +106,11 @@ 32. HADOOP-709. Fix contrib/streaming to work with commands that contain control characters. (Dhruba Borthakur via cutting) +33. HADOOP-677. In IPC, permit a version header to be transmitted + when connections are established. This will permit us to change + the format of IPC requests back-compatibly in subsequent releases. + (omalley via cutting) + Release 0.8.0 - 2006-11-03 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?view=diff&rev=477433&r1=477432&r2=477433 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Mon Nov 20 16:25:44 2006 @@ -31,6 +31,7 @@ import java.io.BufferedOutputStream; import java.io.FilterInputStream; import java.io.FilterOutputStream; +import java.io.OutputStream; import java.util.Hashtable; import java.util.Iterator; @@ -43,6 +44,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.util.StringUtils; /** A client for an IPC service. IPC calls take a single [EMAIL PROTECTED] Writable} as a * parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs on @@ -52,6 +54,10 @@ * @see Server */ public class Client { + /** Should the client send the header on the connection? */ + private static final boolean SEND_HEADER = false; + private static final byte CURRENT_VERSION = 0; + public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.Client"); private Hashtable connections = new Hashtable(); @@ -155,7 +161,6 @@ } } } - socket.setSoTimeout(timeout); this.in = new DataInputStream (new BufferedInputStream @@ -178,6 +183,10 @@ } } })); + if (SEND_HEADER) { + out.write(Server.HEADER.array()); + out.write(CURRENT_VERSION); + } notify(); } @@ -269,7 +278,7 @@ } catch (EOFException eof) { // This is what happens when the remote side goes down } catch (Exception e) { - LOG.info(getName() + " caught: " + e, e); + LOG.info(StringUtils.stringifyException(e)); } finally { //If there was no exception thrown in this method, then the only //way we reached here is by breaking out of the while loop (after @@ -480,7 +489,8 @@ Connection connection = getConnection(addresses[i]); connection.sendParam(call); // send each parameter } catch (IOException e) { - LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors + LOG.info("Calling "+addresses[i]+" caught: " + + StringUtils.stringifyException(e)); // log errors results.size--; // wait for one fewer result } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=477433&r1=477432&r2=477433 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Nov 20 16:25:44 2006 @@ -59,6 +59,12 @@ * @see Client */ public abstract class Server { + + /** + * The first four bytes of Hadoop RPC connections + */ + public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + /** * How much time should be allocated for actually running the handler? * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME @@ -346,6 +352,7 @@ /** Reads calls from a connection and queues them for handling. */ private class Connection { + private boolean firstData = true; private SocketChannel channel; private SelectionKey key; private ByteBuffer data; @@ -415,6 +422,23 @@ if ( count < 0 || dataLengthBuffer.remaining() > 0 ) return count; dataLengthBuffer.flip(); + // Is this a new style header? + if (firstData && HEADER.equals(dataLengthBuffer)) { + // If so, read the version + ByteBuffer versionBuffer = ByteBuffer.allocate(1); + count = channel.read(versionBuffer); + if (count < 0) { + return count; + } + // read the first length + dataLengthBuffer.clear(); + count = channel.read(dataLengthBuffer); + if (count < 0 || dataLengthBuffer.remaining() > 0) { + return count; + } + dataLengthBuffer.flip(); + firstData = false; + } dataLength = dataLengthBuffer.getInt(); data = ByteBuffer.allocate(dataLength); }