Author: cutting
Date: Tue Jun  2 19:02:36 2009
New Revision: 781125

URL: http://svn.apache.org/viewvc?rev=781125&view=rev
Log:
AVRO-2.  Optimized Java RPC handshake protocol.

Added:
    hadoop/avro/trunk/src/schemata/
    hadoop/avro/trunk/src/schemata/org/
    hadoop/avro/trunk/src/schemata/org/apache/
    hadoop/avro/trunk/src/schemata/org/apache/avro/
    hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/
    hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4
    hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4
    hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js
    hadoop/avro/trunk/src/test/schemata/FSData.avpr   (props changed)
      - copied unchanged from r779218, 
hadoop/avro/trunk/src/test/schemata/fs-data.js
    hadoop/avro/trunk/src/test/schemata/interop.avsc   (props changed)
      - copied unchanged from r779218, 
hadoop/avro/trunk/src/test/schemata/interop.js
    hadoop/avro/trunk/src/test/schemata/simple.avpr
      - copied unchanged from r779218, 
hadoop/avro/trunk/src/test/schemata/simple.js
Removed:
    hadoop/avro/trunk/src/test/schemata/fs-data.js
    hadoop/avro/trunk/src/test/schemata/interop.js
    hadoop/avro/trunk/src/test/schemata/simple.js
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/build.xml
    hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
    hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java
    hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java
    hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
    hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java
    hadoop/avro/trunk/src/test/py/interoptests.py
    hadoop/avro/trunk/src/test/py/testipc.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Jun  2 19:02:36 2009
@@ -9,6 +9,8 @@
 
     AVRO-9. Restrict map keys to strings.  (cutting & sharad)
 
+    AVRO-2. Optimized RPC handshake protocol for Java.  (cutting)
+
   NEW FEATURES
 
     AVRO-6. Permit easier implementation of alternate generic data

Modified: hadoop/avro/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Tue Jun  2 19:02:36 2009
@@ -99,24 +99,44 @@
 
   <target name="compile" depends="compile-java,compile-c"/>
 
-  <target name="compile-java" depends="init">
-    <copy todir="${build.classes}">
-      <fileset dir="${java.src.dir}" includes="**/*.js"/>
-    </copy>
+  <target name="compile-java" depends="init,schemata">
+    <java-compiler excludes="**/ipc/** **/*Requestor.java **/*Responder.java"/>
+    <java-avro-compiler/>
+    <java-compiler/>
+  </target>
+
+  <macrodef name="java-compiler">
+    <attribute name="src" default="${java.src.dir}"/>
+    <attribute name="dest" default="${build.classes}"/>
+    <attribute name="includes" default="**/*.java"/>
+    <attribute name="excludes" default=""/>
+    <attribute name="classpath" default="java.classpath"/>
+    <sequential>
+      <javac 
+        srcdir="@{src}"
+        destdir="@{dest}"
+        includes="@{includes}"
+        excludes="@{excludes}"
+        encoding="${javac.encoding}" 
+        debug="${javac.debug}"
+        optimize="${javac.optimize}"
+        target="${javac.version}"
+        source="${javac.version}"
+        deprecation="${javac.deprecation}">
+       <compilerarg line="${javac.args} ${javac.args.warnings}" />
+       <classpath refid="@{classpath}"/>
+      </javac>
+    </sequential>
+  </macrodef>
 
-    <javac 
-     encoding="${javac.encoding}" 
-     srcdir="${java.src.dir}"
-     includes="**/*.java"
-     destdir="${build.classes}"
-     debug="${javac.debug}"
-     optimize="${javac.optimize}"
-     target="${javac.version}"
-     source="${javac.version}"
-     deprecation="${javac.deprecation}">
-      <compilerarg line="${javac.args} ${javac.args.warnings}" />
-      <classpath refid="java.classpath"/>
-    </javac>
+  <target name="schemata" depends="init">
+    <mkdir dir="${build.dir}/src/org/apache/avro/ipc"/>
+    <mapper id="m4toavsc" type="glob" from="*.m4" 
to="${build.dir}/src/*.avsc"/>
+    <apply executable="m4" dir="${src.dir}/schemata">
+      <fileset dir="${src.dir}/schemata" includes="**/*.m4"/>
+      <mapper refid="m4toavsc"/>
+      <redirector><outputmapper refid="m4toavsc"/></redirector>
+    </apply>
   </target>
 
   <target name="jar" depends="compile-java" description="Build jar file.">
@@ -132,62 +152,57 @@
     </jar>
   </target>
   
-  <target name="compile-test-java" 
depends="compile-java,compile-test-schemata">
-    <javac 
-     encoding="${javac.encoding}" 
-     srcdir="${test.java.src.dir}"
-     includes="org/${org}/${name}/**/*.java"
-     destdir="${test.java.classes}"
-     debug="${javac.debug}"
-     optimize="${javac.optimize}"
-     target="${javac.version}"
-     source="${javac.version}"
-     deprecation="${javac.deprecation}">
-      <compilerarg line="${javac.args} ${javac.args.warnings}" />
-      <classpath refid="test.java.classpath"/>
-    </javac> 
+  <target name="compile-test-java" depends="compile-java">
+    <java-avro-compiler src="${test.schemata.dir}"
+                       generated="${test.java.generated.dir}"
+                       dest="${test.java.classes}"
+                       classpath="test.java.classpath"/>
+    <java-compiler src="${test.java.src.dir}"
+                  dest="${test.java.classes}"
+                  classpath="test.java.classpath"/>
   </target>
 
-  <target name="test" depends="test-java,test-py,test-c,test-interop"/>
+  <macrodef name="java-avro-compiler">
+    <attribute name="src" default="${build.dir}/src"/>
+    <attribute name="generated" default="${build.dir}/src"/>
+    <attribute name="dest" default="${build.classes}"/>
+    <attribute name="classpath" default="java.classpath"/>
 
-  <target name="compile-test-schemata" depends="compile-java">
-    <taskdef name="protocol" classname="org.apache.avro.specific.ProtocolTask">
-      <classpath refid="java.classpath" />
-    </taskdef>
-    <taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
-      <classpath refid="java.classpath" />
-    </taskdef>
-    <taskdef name="paranamer" 
-            classname="com.thoughtworks.paranamer.ant.ParanamerGeneratorTask">
-      <classpath refid="java.classpath" />
-    </taskdef>
-    <mkdir dir="${test.java.generated.dir}"/>
-    <protocol destdir="${test.java.generated.dir}">
-      <fileset dir="${test.schemata.dir}">
-       <include name="simple.js" />
-      </fileset>
-    </protocol>
-    <schema destdir="${test.java.generated.dir}">
-      <fileset dir="${test.schemata.dir}">
-       <include name="interop.js" />
-      </fileset>
-    </schema>
-    <javac 
-       encoding="${javac.encoding}" 
-       srcdir="${test.java.generated.dir}"
-       includes="**/*.java"
-       destdir="${test.java.classes}"
-       debug="${javac.debug}"
-       optimize="${javac.optimize}"
-       target="${javac.version}"
-       source="${javac.version}"
-       deprecation="${javac.deprecation}">
-      <compilerarg line="${javac.args} ${javac.args.warnings}" />
-      <classpath refid="test.java.classpath"/>
-    </javac>
-    <paranamer sourceDirectory="${test.java.generated.dir}"
-              outputDirectory="${test.java.classes}"/>
-  </target>
+    <sequential>
+      <taskdef name="protocol"
+              classname="org.apache.avro.specific.ProtocolTask">
+       <classpath refid="java.classpath" />
+      </taskdef>
+      <taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
+       <classpath refid="java.classpath" />
+      </taskdef>
+      <taskdef
+        name="paranamer" 
+        classname="com.thoughtworks.paranamer.ant.ParanamerGeneratorTask">
+       <classpath refid="java.classpath" />
+      </taskdef>
+
+      <mkdir dir="@{generated}"/>
+      
+      <protocol destdir="@{generated}">
+       <fileset dir="@{src}">
+         <include name="**/*.avpr" />
+       </fileset>
+      </protocol>
+      
+      <schema destdir="@{generated}">
+       <fileset dir="@{src}">
+         <include name="**/*.avsc" />
+       </fileset>
+      </schema>
+
+      <java-compiler src="@{generated}" dest="@{dest}"
+                    classpath="@{classpath}"/>
+      <paranamer sourceDirectory="@{generated}" outputDirectory="@{dest}"/>
+    </sequential>
+  </macrodef>
+
+  <target name="test" depends="test-java,test-py,test-c,test-interop"/>
 
   <!-- Define TestNG task for all targets that might need it-->
   <taskdef resource="testngtasks" classpathref="java.classpath"/>
@@ -222,7 +237,7 @@
     <mkdir dir="${test.java.build.dir}/data-files"/>
     <java classname="org.apache.avro.RandomData"
       classpathref="test.java.classpath">
-      <arg value="${basedir}/src/test/schemata/interop.js"/>
+      <arg value="${basedir}/src/test/schemata/interop.avsc"/>
       <arg value="${test.java.build.dir}/data-files/test.java.avro"/>
       <arg value="${test.count}"/>
     </java>
@@ -232,7 +247,7 @@
     </taskdef>
     <py-run script="${basedir}/src/test/py/testio.py" python="python"
       pythonpathref="test.py.path">
-      <arg value="${basedir}/src/test/schemata/interop.js"/>
+      <arg value="${basedir}/src/test/schemata/interop.avsc"/>
       <arg value="${test.java.build.dir}/data-files/test.py.avro"/>
       <arg value="100"/>
     </py-run>

Modified: hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/doc/content/xdocs/spec.xml?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/doc/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/src/doc/content/xdocs/spec.xml Tue Jun  2 19:02:36 2009
@@ -633,43 +633,106 @@
       <section>
        <title>Handshake</title>
 
-       <p>RPC sessions are initiated by a handshake.  No requests may
-       be processed in a session until a successful handshake has
-       completed.  (As with all messages, handshakes are framed as
-       described above.)</p>
-
-       <p>The purpose of the handshake is to exchange protocols, so
-       that the client can deserialize responses, and the
-       server can deserialize requests.</p>
-
-       <p>The handshake is versioned, to permit future enhancements.
-       The handshake documented here is version zero.  All Avro
-       implementations must support version zero.</p>
+       <p>RPC sessions are initiated by handshake.  The purpose of
+       the handshake is to ensure that the client and the server have
+       each other's protocol definition, so that the client can
+       correctly deserialize responses, and the server can correctly
+       deserialize requests.  Both clients and servers should
+       maintain a cache of recently seen protocols, so that, in most
+       cases, a handshake will be completed without extra round-trip
+       network exchanges or the transmission of full protocol text.</p>
+
+       <p>The handshake process uses the following record schemas:</p>
+
+       <source>
+{
+  "type": "record",
+  "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
+  "fields": [
+    {"name": "clientHash",
+     "type": {"type": "fixed", "name": "MD5", "size": 16}},
+    {"name": "clientProtocol", "type": ["null", "string"]},
+    {"name": "serverHash", "type": "MD5"},
+    {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
+  ]
+}
+{
+  "type": "record",
+  "name": "HandshakeResponse", "namespace": "org.apache.avro.ipc",
+  "fields": [
+    {"name": "match",
+     "type": {"type": "enum", "name": "HandshakeMatch",
+             "symbols": ["BOTH", "CLIENT", "NONE"]}},
+    {"name": "serverProtocol",
+     "type": ["null", "string"]},
+    {"name": "serverHash",
+     "type": ["null", {"type": "fixed", "name": "MD5", "size": 16}]},
+    {"name": "meta",
+     "type": ["null", {"type": "map", "values": "bytes"}]}
+  ]
+}
+       </source>
+
+        <ul>
+         <li>In a new session, a client first sends
+         a <code>HandshakeRequest</code> containing just the hash of
+         its protocol and of the server's protocol
+         (<code>clientHash!=null, clientProtocol=null,
+         serverHash!=null</code>), where the hashes are 128-bit MD5
+         hashes of the JSON protocol text. If a client has never
+         connected to a given server, it sends its hash as a guess of
+         the server's hash, otherwise it sends the hash that it
+         previously obtained from this server.</li>
 
-       <p>The version zero handshake consists of a request and
-       response message.</p>
-
-       <p>The format of a version zero handshake request is:</p>
-       <ul>
-         <li>a four-byte, big-endian <em>version</em> of zero,
-         followed by</li>
-         <li>the JSON <em>client's protocol</em> for the session,
-         serialized as an Avro string.</li>
-       </ul>
-
-       <p>The format of a version zero handshake response is:</p>
-       <ul>
-         <li>a one-byte <em>error flag</em> boolean, followed by either:
+         <li>The server responds with
+         a <code>HandshakeResponse</code> containing one of:
            <ul>
-             <li>if the error flag is false, the JSON <em>server's
-               protocol</em> for the session, serialized as an Avro
-               string.</li>
-             <li>if the error flag is true, an error message, as an
-             Avro string.</li>
+             <li><code>match=BOTH, serverProtocol=null,
+             serverHash=null</code> if the client sent the valid hash
+             of the server's protocol and the server knows what
+             protocol corresponds to the client's hash. In this case,
+             the request is complete and the session is
+             established.</li>
+
+             <li><code>match=CLIENT, serverProtocol!=null,
+             serverHash!=null</code> if the server has previously
+             seen the client's protocol, but the client sent an
+             incorrect hash of the server's protocol. The client must
+             then re-send the request with the correct server
+             hash.</li>
+
+              <li><code>match=NONE, serverProtocol!=null,
+             serverHash!=null</code> if the server has not previously
+             seen the client's protocol and the client sent and
+             incorrect hash of the server's protocol.
+
+             <p>In this case The client must then re-submit its
+             request with its protocol text (<code>clientHash!=null,
+             clientProtocol!=null, serverHash!=null</code>) and the
+             server should respond with with a successful match
+             (<code>match=BOTH, serverProtocol=null,
+             serverHash=null</code>) as above.</p>
+             </li>
            </ul>
          </li>
        </ul>
 
+       <p>Until a connection is established, call request data sent
+       by the client must be preceded by
+       a <code>HandshakeRequest</code> and call response data
+       returned by the server must be preceded by a
+       <code>HandshakeResponse</code>. A connection is not
+       established until a <code>HandshakeResponse</code> with
+       <code>match=BOTH</code> or <code>match=CLIENT</code> is
+       returned.  In these cases, the call response data immmediately
+       follows
+       the <code>HandShakeResponse</code>. When <code>match=NONE</code>
+       no response call data is sent and the request call data is
+       ignored.</p>
+
+       <p>The <code>meta</code> field is reserved for future
+       handshake enhancements.</p>
+
       </section>
 
       <section>

Modified: hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java Tue Jun  2 
19:02:36 2009
@@ -27,6 +27,7 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.security.MessageDigest;
 
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
@@ -125,6 +126,7 @@
 
   private Schema.Names types = new Schema.Names();
   private Map<String,Message> messages = new LinkedHashMap<String,Message>();
+  private byte[] md5;
 
   /** An error that can be thrown by any message. */
   public static final Schema SYSTEM_ERROR = Schema.create(Schema.Type.STRING);
@@ -198,6 +200,18 @@
     return buffer.toString();
   }
 
+  /** Return the MD5 hash of the text of this protocol. */
+  public byte[] getMD5() {
+    if (md5 == null)
+      try {
+        md5 = MessageDigest.getInstance("MD5")
+          .digest(this.toString().getBytes("UTF-8"));
+      } catch (Exception e) {
+        throw new AvroRuntimeException(e);
+      }
+    return md5;
+  }
+
   /** Read a protocol from a Json file. */
   public static Protocol parse(File file) throws IOException {
     return parse(Schema.FACTORY.createJsonParser(file));

Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java 
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java Tue Jun 
 2 19:02:36 2009
@@ -104,7 +104,7 @@
     public Fixed(byte[] bytes) { bytes(bytes); }
 
     protected Fixed() {}
-    protected void bytes(byte[] bytes) { this.bytes = bytes; }
+    public void bytes(byte[] bytes) { this.bytes = bytes; }
 
     public byte[] bytes() { return bytes; }
 
@@ -116,6 +116,7 @@
 
     public int hashCode() { return Arrays.hashCode(bytes); }
 
+    public String toString() { return Arrays.toString(bytes); }
   }
 
 

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Tue Jun  2 
19:02:36 2009
@@ -27,6 +27,7 @@
 import org.apache.avro.Protocol.Message;
 import org.apache.avro.util.*;
 import org.apache.avro.io.*;
+import org.apache.avro.specific.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@
 
   private Protocol local;
   private Protocol remote;
+  private boolean established, sendLocalText;
   private Transceiver transceiver;
 
   public Protocol getLocal() { return local; }
@@ -47,38 +49,34 @@
     throws IOException {
     this.local = local;
     this.transceiver = transceiver;
-    this.remote = handshake();
-  }
-
-  private Protocol handshake() throws IOException {
-    ByteBufferValueWriter out = new ByteBufferValueWriter();
-    out.writeLong(Protocol.VERSION);              // send handshake version
-    out.writeUtf8(new Utf8(local.toString()));    // send local protocol
-    List<ByteBuffer> response = transceiver.transceive(out.getBufferList());
-    ValueReader in = new ByteBufferValueReader(response);
-    String remote = in.readUtf8(null).toString(); // read remote protocol
-    return Protocol.parse(remote);                // parse & return it
   }
 
   /** Writes a request message and reads a response or error message. */
   public Object request(String messageName, Object request)
     throws IOException {
-
-    // use local protocol to write request
-    Message m = getLocal().getMessages().get(messageName);
-    if (m == null)
-      throw new AvroRuntimeException("Not a local message: "+messageName);
-
-    ByteBufferValueWriter out = new ByteBufferValueWriter();
-
-    out.writeUtf8(new Utf8(m.getName()));         // write message name
-  
-    writeRequest(m.getRequest(), request, out);
-
-    List<ByteBuffer> response =
-      getTransceiver().transceive(out.getBufferList());
-
-    ValueReader in = new ByteBufferValueReader(response);
+    ValueReader in;
+    Message m;
+    do {
+      ByteBufferValueWriter out = new ByteBufferValueWriter();
+
+      if (!established)                           // if not established
+        writeHandshake(out);                      // prepend handshake
+
+      // use local protocol to write request
+      m = getLocal().getMessages().get(messageName);
+      if (m == null)
+        throw new AvroRuntimeException("Not a local message: "+messageName);
+      
+      out.writeUtf8(new Utf8(m.getName()));       // write message name
+      writeRequest(m.getRequest(), request, out); // write request payload
+      
+      List<ByteBuffer> response =                 // transceive
+        getTransceiver().transceive(out.getBufferList());
+      
+      in = new ByteBufferValueReader(response);
+      if (!established)                           // if not established
+        readHandshake(in);                        // process handshake
+    } while (!established);
 
     // use remote protocol to read response
     m = getRemote().getMessages().get(messageName);
@@ -91,6 +89,65 @@
     }
   }
 
+  private static final Map<String,MD5> REMOTE_HASHES =
+    Collections.synchronizedMap(new HashMap<String,MD5>());
+  private static final Map<MD5,Protocol> REMOTE_PROTOCOLS =
+    Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+
+  private static final SpecificDatumWriter HANDSHAKE_WRITER =
+    new SpecificDatumWriter(HandshakeRequest._SCHEMA);
+
+  private static final SpecificDatumReader HANDSHAKE_READER =
+    new SpecificDatumReader(HandshakeResponse._SCHEMA);
+
+  private void writeHandshake(ValueWriter out) throws IOException {
+    MD5 localHash = new MD5();
+    localHash.bytes(local.getMD5());
+    String remoteName = transceiver.getRemoteName();
+    MD5 remoteHash = REMOTE_HASHES.get(remoteName);
+    remote = REMOTE_PROTOCOLS.get(remoteHash);
+    if (remoteHash == null) {                     // guess remote is local
+      remoteHash = localHash;
+      remote = local;
+    }
+    HandshakeRequest handshake = new HandshakeRequest();
+    handshake.clientHash = localHash;
+    handshake.serverHash = remoteHash;
+    if (sendLocalText)
+      handshake.clientProtocol = new Utf8(local.toString());
+    HANDSHAKE_WRITER.write(handshake, out);
+  }
+
+  private void readHandshake(ValueReader in) throws IOException {
+    HandshakeResponse handshake =
+      (HandshakeResponse)HANDSHAKE_READER.read(null, in);
+    switch (handshake.match) {
+    case BOTH:
+      established = true;
+      break;
+    case CLIENT:
+      LOG.debug("Handshake match = CLIENT");
+      setRemote(handshake);
+      established = true;
+      break;
+    case NONE:
+      LOG.debug("Handshake match = NONE");
+      setRemote(handshake);
+      sendLocalText = true;
+      break;
+    default:
+      throw new AvroRuntimeException("Unexpected match: "+handshake.match);
+    }
+  }
+
+  private void setRemote(HandshakeResponse handshake) {
+    remote = Protocol.parse(handshake.serverProtocol.toString());
+    MD5 remoteHash = (MD5)handshake.serverHash;
+    REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);
+    if (!REMOTE_PROTOCOLS.containsKey(remoteHash))
+      REMOTE_PROTOCOLS.put(remoteHash, remote);
+  }
+
   /** Writes a request message. */
   public abstract void writeRequest(Schema schema, Object request,
                                     ValueWriter out) throws IOException;

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Tue Jun  2 
19:02:36 2009
@@ -29,47 +29,42 @@
 import org.apache.avro.Protocol.Message;
 import org.apache.avro.util.*;
 import org.apache.avro.io.*;
+import org.apache.avro.specific.*;
 
 /** Base class for the server side of a protocol interaction. */
 public abstract class Responder {
   private static final Logger LOG = LoggerFactory.getLogger(Responder.class);
 
+  private Map<Transceiver,Protocol> remotes
+    = Collections.synchronizedMap(new WeakHashMap<Transceiver,Protocol>());
+  private Map<MD5,Protocol> protocols
+    = Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+
   private Protocol local;
+  private MD5 localHash;
 
   protected Responder(Protocol local) {
     this.local = local;
+    this.localHash = new MD5();
+    localHash.bytes(local.getMD5());
+    protocols.put(localHash, local);
   }
 
   public Protocol getLocal() { return local; }
 
-  /** Returns the remote protocol. */
-  protected Protocol handshake(Transceiver server) throws IOException {
-    ValueReader in = new ByteBufferValueReader(server.readBuffers());
-    ByteBufferValueWriter out = new ByteBufferValueWriter();
-
-    long version = in.readLong();                 // read handshake version
-    if (version != Protocol.VERSION)
-      throw new AvroRuntimeException("Incompatible request version: "+version);
-                                                  // read remote protocol
-    Protocol remote = Protocol.parse(in.readUtf8(null).toString());
-
-    out.writeUtf8(new Utf8(local.toString()));    // write local protocol
-    server.writeBuffers(out.getBufferList());
-
-    return remote;                                // return remote protocol
-  }
-
-  /** Called by a server to deserialize requests, compute and serialize
-   * responses and errors. */
-  public List<ByteBuffer> respond(Protocol remote, List<ByteBuffer> input)
-    throws IOException {
+  /** Called by a server to deserialize a request, compute and serialize
+   * a response or error. */
+  public List<ByteBuffer> respond(Transceiver transceiver) throws IOException {
+    ValueReader in = new ByteBufferValueReader(transceiver.readBuffers());
     ByteBufferValueWriter out = new ByteBufferValueWriter();
     AvroRemoteException error = null;
     try {
+      Protocol remote = handshake(transceiver, in, out);
+      if (remote == null)                        // handshake failed
+        return out.getBufferList();
+
       // read request using remote protocol specification
-      ValueReader in = new ByteBufferValueReader(input);
       String messageName = in.readUtf8(null).toString();
-
       Message m = remote.getMessages().get(messageName);
       if (m == null)
         throw new AvroRuntimeException("No such remote message: "+messageName);
@@ -107,6 +102,40 @@
     return out.getBufferList();
   }
 
+  private SpecificDatumWriter handshakeWriter =
+    new SpecificDatumWriter(HandshakeResponse._SCHEMA);
+  private SpecificDatumReader handshakeReader =
+    new SpecificDatumReader(HandshakeRequest._SCHEMA);
+
+  private Protocol handshake(Transceiver transceiver,
+                             ValueReader in, ValueWriter out)
+    throws IOException {
+    Protocol remote = remotes.get(transceiver);
+    if (remote != null) return remote;            // already established
+      
+    HandshakeRequest request = (HandshakeRequest)handshakeReader.read(null, 
in);
+    remote = protocols.get(request.clientHash);
+    if (remote == null && request.clientProtocol != null) {
+      remote = Protocol.parse(request.clientProtocol.toString());
+      protocols.put(request.clientHash, remote);
+    }
+    if (remote != null)
+      remotes.put(transceiver, remote);
+    HandshakeResponse response = new HandshakeResponse();
+    if (localHash.equals(request.serverHash)) {
+      response.match =
+        remote == null ? HandshakeMatch.NONE : HandshakeMatch.BOTH;
+    } else {
+      response.match =
+        remote == null ? HandshakeMatch.NONE : HandshakeMatch.CLIENT;
+    }
+    if (response.match != HandshakeMatch.BOTH) {
+      response.serverProtocol = new Utf8(local.toString());
+      response.serverHash = localHash;
+    }
+    handshakeWriter.write(response, out);
+    return remote;
+  }
 
   /** Computes the response for a message. */
   public abstract Object respond(Message message, Object request)

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java Tue Jun  2 
19:02:36 2009
@@ -23,7 +23,7 @@
 import java.net.SocketAddress;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
@@ -38,38 +38,43 @@
 
   private Responder responder;
   private ServerSocketChannel channel;
+  private ThreadGroup group;
 
   public SocketServer(Responder responder, SocketAddress addr)
     throws IOException {
-    this.responder = responder;
+    String name = "SocketServer on "+addr;
 
+    this.responder = responder;
+    this.group = new ThreadGroup(name);
     this.channel = ServerSocketChannel.open();
+
     channel.socket().bind(addr);
 
-    setName("SocketServer on "+addr);
+    setName(name);
     setDaemon(true);
-    LOG.info("starting on "+addr);
     start();
   }
 
   public int getPort() { return channel.socket().getLocalPort(); }
 
   public void run() {
+    LOG.info("starting "+channel.socket().getInetAddress());
     while (true) {
       try {
-        LOG.info("listening on "+channel.socket().getInetAddress());
         new Connection(channel.accept());
-      } catch (ClosedByInterruptException e) {
+      } catch (ClosedChannelException e) {
         return;
       } catch (IOException e) {
         LOG.warn("unexpected error", e);
         throw new RuntimeException(e);
+      } finally {
+        LOG.info("stopping "+channel.socket().getInetAddress());
       }
     }
   }
 
   public void close() {
-    interrupt();
+    group.interrupt();
   }
 
   private class Connection extends SocketTransceiver implements Runnable {
@@ -77,37 +82,28 @@
     public Connection(SocketChannel channel) {
       super(channel);
 
-      Thread thread = new Thread(this);
-      LOG.info("connection from "+channel.socket().getRemoteSocketAddress());
-      thread.setName("Connection for "+channel);
+      Thread thread = new Thread(group, this);
+      thread.setName("Connection to 
"+channel.socket().getRemoteSocketAddress());
       thread.setDaemon(true);
       thread.start();
     }
 
     public void run() {
       try {
-        Protocol remote = responder.handshake(this);
-        while (true) {
-          writeBuffers(responder.respond(remote, readBuffers()));
+        try {
+          while (true) {
+            writeBuffers(responder.respond(this));
+          }
+        } catch (ClosedChannelException e) {
+          return;
+        } finally {
+          close();
         }
-      } catch (ClosedByInterruptException e) {
-        return;
       } catch (IOException e) {
         LOG.warn("unexpected error", e);
-        throw new RuntimeException(e);
-      } finally {
-        try {
-          super.close();
-        } catch (IOException e) {
-          LOG.warn("unexpected error", e);
-          throw new RuntimeException(e);
-        }
       }
     }
 
-    public void close() {
-      interrupt();
-    }
   }
   
   public static void main(String arg[]) throws Exception {

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java 
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java Tue 
Jun  2 19:02:36 2009
@@ -35,12 +35,17 @@
   private SocketChannel channel;
   private ByteBuffer header = ByteBuffer.allocate(4);
   
+  public String getRemoteName() {
+    return channel.socket().getRemoteSocketAddress().toString();
+  }
+
   public SocketTransceiver(SocketAddress address) throws IOException {
     this(SocketChannel.open(address));
   }
 
   public SocketTransceiver(SocketChannel channel) {
     this.channel = channel;
+    LOG.info("open to "+channel.socket().getRemoteSocketAddress());
   }
 
   public synchronized List<ByteBuffer> readBuffers() throws IOException {
@@ -81,6 +86,7 @@
   }
 
   public void close() throws IOException {
+    LOG.info("closing to "+channel.socket().getRemoteSocketAddress());
     channel.close();
   }
 

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java Tue Jun  2 
19:02:36 2009
@@ -25,6 +25,8 @@
 /** Base class for transmitters and recievers of raw binary messages. */
 public abstract class Transceiver {
 
+  public abstract String getRemoteName();
+
   public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
     throws IOException {
     writeBuffers(request);

Modified: 
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java 
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java 
Tue Jun  2 19:02:36 2009
@@ -20,6 +20,8 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
 
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
@@ -34,6 +36,7 @@
 
   private String namespace;
   private StringBuilder buffer = new StringBuilder();
+  private Set<String> compiledTypes = new HashSet<String>();
 
   private SpecificCompiler() {}                        // no public ctor
 
@@ -131,6 +134,7 @@
 
   private void compile(Schema schema, String name, int d) {
     String type = type(schema, name);
+    if (compiledTypes.contains(type)) return; else compiledTypes.add(type);
     switch (schema.getType()) {
     case RECORD:
       buffer.append("\n");
@@ -141,7 +145,7 @@
              : " extends SpecificRecordBase")
            +" implements SpecificRecord {");
       // schema definition
-      line(d+1, "private static final Schema _SCHEMA = Schema.parse(\""
+      line(d+1, "public static final Schema _SCHEMA = Schema.parse(\""
            +esc(schema)+"\");");
       // field declations
       for (Map.Entry<String, Schema> field : schema.getFieldSchemas()) {

Modified: 
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- 
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java 
(original)
+++ 
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java 
Tue Jun  2 19:02:36 2009
@@ -39,6 +39,10 @@
     super(root, packageName);
   }
 
+  public SpecificDatumReader(Schema root) {
+    super(root, root.getNamespace()+".");
+  }
+
   protected void addField(Object record, String name, int position, Object o) {
     ((SpecificRecord)record).set(position, o);
   }

Added: hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4?rev=781125&view=auto
==============================================================================
--- hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4 
(added)
+++ hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4 Tue 
Jun  2 19:02:36 2009
@@ -0,0 +1,11 @@
+{
+    "type": "record",
+    "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
+    "fields": [
+        {"name": "clientHash",
+        "type": include(`org/apache/avro/ipc/MD5.js')},
+        {"name": "clientProtocol", "type": ["null", "string"]},
+        {"name": "serverHash", "type": "MD5"},
+       {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
+ ]
+}

Added: hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4?rev=781125&view=auto
==============================================================================
--- hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4 
(added)
+++ hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4 Tue 
Jun  2 19:02:36 2009
@@ -0,0 +1,15 @@
+{
+    "type": "record",
+    "name": "HandshakeResponse", "namespace": "org.apache.avro.ipc",
+    "fields": [
+        {"name": "match",
+         "type": {"type": "enum", "name": "HandshakeMatch",
+                  "symbols": ["BOTH", "CLIENT", "NONE"]}},
+        {"name": "serverProtocol",
+         "type": ["null", "string"]},
+        {"name": "serverHash",
+         "type": ["null", include(`org/apache/avro/ipc/MD5.js')]},
+       {"name": "meta",
+         "type": ["null", {"type": "map", "values": "bytes"}]}
+    ]
+}

Added: hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js?rev=781125&view=auto
==============================================================================
--- hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js (added)
+++ hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js Tue Jun  2 
19:02:36 2009
@@ -0,0 +1 @@
+{"type": "fixed", "name": "MD5", "size": 16}

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java Tue Jun  2 
19:02:36 2009
@@ -26,8 +26,8 @@
 import org.apache.avro.util.Utf8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -47,7 +47,7 @@
     Integer.parseInt(System.getProperty("test.count", "10"));
   private static final int BUFFER_SIZE = 64 * 1024;
   private static final File DIR=new File(System.getProperty("test.dir", 
"/tmp"));
-  private static final File FILE = new File("src/test/schemata/fs-data.js");
+  private static final File FILE = new File("src/test/schemata/FSData.avpr");
   private static final Protocol PROTOCOL;
   static {
     try {
@@ -95,7 +95,7 @@
   private static Requestor requestor;
   private static FileChannel fileChannel;
 
-  @BeforeMethod
+  @BeforeClass
   public void testStartServer() throws Exception {
     // create a file that has COUNT * BUFFER_SIZE bytes of random data
     Random rand = new Random();
@@ -132,7 +132,7 @@
     }
   }
 
-  @AfterMethod
+  @AfterClass
   public void testStopServer() throws Exception {
     server.close();
     fileChannel.close();

Modified: 
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java 
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java 
Tue Jun  2 19:02:36 2009
@@ -18,6 +18,7 @@
 package org.apache.avro;
 
 import org.apache.avro.Protocol.Message;
+import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRequestor;
@@ -28,8 +29,8 @@
 import org.slf4j.LoggerFactory;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -38,12 +39,14 @@
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Random;
+import java.util.LinkedHashMap;
+import java.util.ArrayList;
 
 public class TestProtocolGeneric {
   private static final Logger LOG
     = LoggerFactory.getLogger(TestProtocolGeneric.class);
 
-  private static final File FILE = new File("src/test/schemata/simple.js");
+  private static final File FILE = new File("src/test/schemata/simple.avpr");
   private static final Protocol PROTOCOL;
   static {
     try {
@@ -93,7 +96,7 @@
   private static Transceiver client;
   private static Requestor requestor;
 
-  @BeforeMethod
+  @BeforeClass
   public void testStartServer() throws Exception {
     server = new SocketServer(new TestResponder(), new InetSocketAddress(0));
     client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
@@ -152,8 +155,39 @@
     assertEquals("an error", 
((Map)error.getValue()).get("message").toString());
   }
 
-  @AfterMethod
-  public void testStopServer() {
+  @Test
+  /** Construct and use a different protocol whose "hello" method has an extra
+      argument to check that schema is sent to parse request. */
+  public void testHandshake() throws IOException {
+    Protocol protocol = new Protocol("Simple", "org.apache.avro.test");
+    LinkedHashMap<String,Field> fields = new LinkedHashMap<String,Field>();
+    fields.put("extra",
+               new Schema.Field(Schema.create(Schema.Type.BOOLEAN), null));
+    fields.put("greeting",
+               new Schema.Field(Schema.create(Schema.Type.STRING), null));
+    Protocol.Message message =
+      protocol.createMessage("hello",
+                             Schema.createRecord(fields),
+                             Schema.create(Schema.Type.STRING),
+                             Schema.createUnion(new ArrayList<Schema>()));
+    protocol.getMessages().put("hello", message);
+    Transceiver t
+      = new SocketTransceiver(new InetSocketAddress(server.getPort()));
+    try {
+      Requestor r = new GenericRequestor(protocol, t);
+      GenericRecord params = new GenericData.Record(message.getRequest());
+      params.put("extra", Boolean.TRUE);
+      params.put("greeting", new Utf8("bob"));
+      Utf8 response = (Utf8)r.request("hello", params);
+      assertEquals(new Utf8("goodbye"), response);
+    } finally {
+      t.close();
+    }
+  }
+
+  @AfterClass
+  public void testStopServer() throws IOException {
+    client.close();
     server.close();
   }
 }

Modified: 
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java 
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java 
Tue Jun  2 19:02:36 2009
@@ -22,13 +22,13 @@
 import org.apache.avro.reflect.ReflectRequestor;
 import org.apache.avro.reflect.ReflectResponder;
 import org.apache.avro.test.Simple;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
 
 import java.net.InetSocketAddress;
 
 public class TestProtocolReflect extends TestProtocolSpecific {
 
-  @BeforeMethod
+  @BeforeClass
   public void testStartServer() throws Exception {
     server = new SocketServer(new ReflectResponder(Simple.class, new 
TestImpl()),
                               new InetSocketAddress(0));

Modified: 
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java 
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java 
Tue Jun  2 19:02:36 2009
@@ -33,8 +33,8 @@
 import org.slf4j.LoggerFactory;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.io.*;
@@ -50,16 +50,6 @@
   private static final File SERVER_PORTS_DIR
   = new File(System.getProperty("test.dir", "/tmp")+"/server-ports/");
 
-  private static final File FILE = new File("src/test/schemata/simple.js");
-  private static final Protocol PROTOCOL;
-  static {
-    try {
-      PROTOCOL = Protocol.parse(FILE);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   public static class TestImpl implements Simple {
     public Utf8 hello(Utf8 greeting) { return new Utf8("goodbye"); }
     public TestRecord echo(TestRecord record) { return record; }
@@ -75,7 +65,7 @@
   protected static Transceiver client;
   protected static Simple proxy;
 
-  @BeforeMethod
+  @BeforeClass
   public void testStartServer() throws Exception {
     server = new SocketServer(new SpecificResponder(Simple.class, new 
TestImpl()),
                               new InetSocketAddress(0));
@@ -133,8 +123,9 @@
     assertEquals("an error", error.message.toString());
   }
 
-  @AfterMethod
-  public void testStopServer() {
+  @AfterClass
+  public void testStopServer() throws IOException {
+    client.close();
     server.close();
   }
 

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java Tue Jun  2 
19:02:36 2009
@@ -32,7 +32,7 @@
   private static final Logger LOG
     = LoggerFactory.getLogger(TestProtocolSpecific.class);
 
-  private static final File FILE = new File("src/test/schemata/simple.js");
+  private static final File FILE = new File("src/test/schemata/simple.avpr");
   private static final Protocol PROTOCOL;
   static {
     try {

Modified: hadoop/avro/trunk/src/test/py/interoptests.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/interoptests.py?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/interoptests.py (original)
+++ hadoop/avro/trunk/src/test/py/interoptests.py Tue Jun  2 19:02:36 2009
@@ -34,7 +34,7 @@
     self.__datumreader = datumreader
 
   def testreadfiles(self):
-    origschm = schema.parse(open("src/test/schemata/interop.js").read())
+    origschm = schema.parse(open("src/test/schemata/interop.avsc").read())
     for file in os.listdir(_DATAFILE_DIR):
       print "Validating:", file.__str__()
       dr = io.DataFileReader(open(_DATAFILE_DIR+file, "rb"), 
@@ -86,4 +86,4 @@
   elif sys.argv[1] == "server":
     _interopserver()
   else:
-    print usage
\ No newline at end of file
+    print usage

Modified: hadoop/avro/trunk/src/test/py/testipc.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipc.py?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipc.py (original)
+++ hadoop/avro/trunk/src/test/py/testipc.py Tue Jun  2 19:02:36 2009
@@ -21,7 +21,7 @@
 import avro.protocol as protocol
 import avro.schema as schema
 
-PROTOCOL = protocol.parse(open("src/test/schemata/simple.js").read())
+PROTOCOL = protocol.parse(open("src/test/schemata/simple.avpr").read())
 
 class TestProtocol(unittest.TestCase):
 

Propchange: hadoop/avro/trunk/src/test/schemata/FSData.avpr
------------------------------------------------------------------------------
    svn:mergeinfo = 

Propchange: hadoop/avro/trunk/src/test/schemata/interop.avsc
------------------------------------------------------------------------------
    svn:mergeinfo = 


Reply via email to