Author: cutting
Date: Tue Jun 2 19:02:36 2009
New Revision: 781125
URL: http://svn.apache.org/viewvc?rev=781125&view=rev
Log:
AVRO-2. Optimized Java RPC handshake protocol.
Added:
hadoop/avro/trunk/src/schemata/
hadoop/avro/trunk/src/schemata/org/
hadoop/avro/trunk/src/schemata/org/apache/
hadoop/avro/trunk/src/schemata/org/apache/avro/
hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/
hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4
hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4
hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js
hadoop/avro/trunk/src/test/schemata/FSData.avpr (props changed)
- copied unchanged from r779218,
hadoop/avro/trunk/src/test/schemata/fs-data.js
hadoop/avro/trunk/src/test/schemata/interop.avsc (props changed)
- copied unchanged from r779218,
hadoop/avro/trunk/src/test/schemata/interop.js
hadoop/avro/trunk/src/test/schemata/simple.avpr
- copied unchanged from r779218,
hadoop/avro/trunk/src/test/schemata/simple.js
Removed:
hadoop/avro/trunk/src/test/schemata/fs-data.js
hadoop/avro/trunk/src/test/schemata/interop.js
hadoop/avro/trunk/src/test/schemata/simple.js
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/build.xml
hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java
hadoop/avro/trunk/src/test/py/interoptests.py
hadoop/avro/trunk/src/test/py/testipc.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Jun 2 19:02:36 2009
@@ -9,6 +9,8 @@
AVRO-9. Restrict map keys to strings. (cutting & sharad)
+ AVRO-2. Optimized RPC handshake protocol for Java. (cutting)
+
NEW FEATURES
AVRO-6. Permit easier implementation of alternate generic data
Modified: hadoop/avro/trunk/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Tue Jun 2 19:02:36 2009
@@ -99,24 +99,44 @@
<target name="compile" depends="compile-java,compile-c"/>
- <target name="compile-java" depends="init">
- <copy todir="${build.classes}">
- <fileset dir="${java.src.dir}" includes="**/*.js"/>
- </copy>
+ <target name="compile-java" depends="init,schemata">
+ <java-compiler excludes="**/ipc/** **/*Requestor.java **/*Responder.java"/>
+ <java-avro-compiler/>
+ <java-compiler/>
+ </target>
+
+ <macrodef name="java-compiler">
+ <attribute name="src" default="${java.src.dir}"/>
+ <attribute name="dest" default="${build.classes}"/>
+ <attribute name="includes" default="**/*.java"/>
+ <attribute name="excludes" default=""/>
+ <attribute name="classpath" default="java.classpath"/>
+ <sequential>
+ <javac
+ srcdir="@{src}"
+ destdir="@{dest}"
+ includes="@{includes}"
+ excludes="@{excludes}"
+ encoding="${javac.encoding}"
+ debug="${javac.debug}"
+ optimize="${javac.optimize}"
+ target="${javac.version}"
+ source="${javac.version}"
+ deprecation="${javac.deprecation}">
+ <compilerarg line="${javac.args} ${javac.args.warnings}" />
+ <classpath refid="@{classpath}"/>
+ </javac>
+ </sequential>
+ </macrodef>
- <javac
- encoding="${javac.encoding}"
- srcdir="${java.src.dir}"
- includes="**/*.java"
- destdir="${build.classes}"
- debug="${javac.debug}"
- optimize="${javac.optimize}"
- target="${javac.version}"
- source="${javac.version}"
- deprecation="${javac.deprecation}">
- <compilerarg line="${javac.args} ${javac.args.warnings}" />
- <classpath refid="java.classpath"/>
- </javac>
+ <target name="schemata" depends="init">
+ <mkdir dir="${build.dir}/src/org/apache/avro/ipc"/>
+ <mapper id="m4toavsc" type="glob" from="*.m4"
to="${build.dir}/src/*.avsc"/>
+ <apply executable="m4" dir="${src.dir}/schemata">
+ <fileset dir="${src.dir}/schemata" includes="**/*.m4"/>
+ <mapper refid="m4toavsc"/>
+ <redirector><outputmapper refid="m4toavsc"/></redirector>
+ </apply>
</target>
<target name="jar" depends="compile-java" description="Build jar file.">
@@ -132,62 +152,57 @@
</jar>
</target>
- <target name="compile-test-java"
depends="compile-java,compile-test-schemata">
- <javac
- encoding="${javac.encoding}"
- srcdir="${test.java.src.dir}"
- includes="org/${org}/${name}/**/*.java"
- destdir="${test.java.classes}"
- debug="${javac.debug}"
- optimize="${javac.optimize}"
- target="${javac.version}"
- source="${javac.version}"
- deprecation="${javac.deprecation}">
- <compilerarg line="${javac.args} ${javac.args.warnings}" />
- <classpath refid="test.java.classpath"/>
- </javac>
+ <target name="compile-test-java" depends="compile-java">
+ <java-avro-compiler src="${test.schemata.dir}"
+ generated="${test.java.generated.dir}"
+ dest="${test.java.classes}"
+ classpath="test.java.classpath"/>
+ <java-compiler src="${test.java.src.dir}"
+ dest="${test.java.classes}"
+ classpath="test.java.classpath"/>
</target>
- <target name="test" depends="test-java,test-py,test-c,test-interop"/>
+ <macrodef name="java-avro-compiler">
+ <attribute name="src" default="${build.dir}/src"/>
+ <attribute name="generated" default="${build.dir}/src"/>
+ <attribute name="dest" default="${build.classes}"/>
+ <attribute name="classpath" default="java.classpath"/>
- <target name="compile-test-schemata" depends="compile-java">
- <taskdef name="protocol" classname="org.apache.avro.specific.ProtocolTask">
- <classpath refid="java.classpath" />
- </taskdef>
- <taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
- <classpath refid="java.classpath" />
- </taskdef>
- <taskdef name="paranamer"
- classname="com.thoughtworks.paranamer.ant.ParanamerGeneratorTask">
- <classpath refid="java.classpath" />
- </taskdef>
- <mkdir dir="${test.java.generated.dir}"/>
- <protocol destdir="${test.java.generated.dir}">
- <fileset dir="${test.schemata.dir}">
- <include name="simple.js" />
- </fileset>
- </protocol>
- <schema destdir="${test.java.generated.dir}">
- <fileset dir="${test.schemata.dir}">
- <include name="interop.js" />
- </fileset>
- </schema>
- <javac
- encoding="${javac.encoding}"
- srcdir="${test.java.generated.dir}"
- includes="**/*.java"
- destdir="${test.java.classes}"
- debug="${javac.debug}"
- optimize="${javac.optimize}"
- target="${javac.version}"
- source="${javac.version}"
- deprecation="${javac.deprecation}">
- <compilerarg line="${javac.args} ${javac.args.warnings}" />
- <classpath refid="test.java.classpath"/>
- </javac>
- <paranamer sourceDirectory="${test.java.generated.dir}"
- outputDirectory="${test.java.classes}"/>
- </target>
+ <sequential>
+ <taskdef name="protocol"
+ classname="org.apache.avro.specific.ProtocolTask">
+ <classpath refid="java.classpath" />
+ </taskdef>
+ <taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
+ <classpath refid="java.classpath" />
+ </taskdef>
+ <taskdef
+ name="paranamer"
+ classname="com.thoughtworks.paranamer.ant.ParanamerGeneratorTask">
+ <classpath refid="java.classpath" />
+ </taskdef>
+
+ <mkdir dir="@{generated}"/>
+
+ <protocol destdir="@{generated}">
+ <fileset dir="@{src}">
+ <include name="**/*.avpr" />
+ </fileset>
+ </protocol>
+
+ <schema destdir="@{generated}">
+ <fileset dir="@{src}">
+ <include name="**/*.avsc" />
+ </fileset>
+ </schema>
+
+ <java-compiler src="@{generated}" dest="@{dest}"
+ classpath="@{classpath}"/>
+ <paranamer sourceDirectory="@{generated}" outputDirectory="@{dest}"/>
+ </sequential>
+ </macrodef>
+
+ <target name="test" depends="test-java,test-py,test-c,test-interop"/>
<!-- Define TestNG task for all targets that might need it-->
<taskdef resource="testngtasks" classpathref="java.classpath"/>
@@ -222,7 +237,7 @@
<mkdir dir="${test.java.build.dir}/data-files"/>
<java classname="org.apache.avro.RandomData"
classpathref="test.java.classpath">
- <arg value="${basedir}/src/test/schemata/interop.js"/>
+ <arg value="${basedir}/src/test/schemata/interop.avsc"/>
<arg value="${test.java.build.dir}/data-files/test.java.avro"/>
<arg value="${test.count}"/>
</java>
@@ -232,7 +247,7 @@
</taskdef>
<py-run script="${basedir}/src/test/py/testio.py" python="python"
pythonpathref="test.py.path">
- <arg value="${basedir}/src/test/schemata/interop.js"/>
+ <arg value="${basedir}/src/test/schemata/interop.avsc"/>
<arg value="${test.java.build.dir}/data-files/test.py.avro"/>
<arg value="100"/>
</py-run>
Modified: hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/doc/content/xdocs/spec.xml?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/doc/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/src/doc/content/xdocs/spec.xml Tue Jun 2 19:02:36 2009
@@ -633,43 +633,106 @@
<section>
<title>Handshake</title>
- <p>RPC sessions are initiated by a handshake. No requests may
- be processed in a session until a successful handshake has
- completed. (As with all messages, handshakes are framed as
- described above.)</p>
-
- <p>The purpose of the handshake is to exchange protocols, so
- that the client can deserialize responses, and the
- server can deserialize requests.</p>
-
- <p>The handshake is versioned, to permit future enhancements.
- The handshake documented here is version zero. All Avro
- implementations must support version zero.</p>
+ <p>RPC sessions are initiated by handshake. The purpose of
+ the handshake is to ensure that the client and the server have
+ each other's protocol definition, so that the client can
+ correctly deserialize responses, and the server can correctly
+ deserialize requests. Both clients and servers should
+ maintain a cache of recently seen protocols, so that, in most
+ cases, a handshake will be completed without extra round-trip
+ network exchanges or the transmission of full protocol text.</p>
+
+ <p>The handshake process uses the following record schemas:</p>
+
+ <source>
+{
+ "type": "record",
+ "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
+ "fields": [
+ {"name": "clientHash",
+ "type": {"type": "fixed", "name": "MD5", "size": 16}},
+ {"name": "clientProtocol", "type": ["null", "string"]},
+ {"name": "serverHash", "type": "MD5"},
+ {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
+ ]
+}
+{
+ "type": "record",
+ "name": "HandshakeResponse", "namespace": "org.apache.avro.ipc",
+ "fields": [
+ {"name": "match",
+ "type": {"type": "enum", "name": "HandshakeMatch",
+ "symbols": ["BOTH", "CLIENT", "NONE"]}},
+ {"name": "serverProtocol",
+ "type": ["null", "string"]},
+ {"name": "serverHash",
+ "type": ["null", {"type": "fixed", "name": "MD5", "size": 16}]},
+ {"name": "meta",
+ "type": ["null", {"type": "map", "values": "bytes"}]}
+ ]
+}
+ </source>
+
+ <ul>
+ <li>In a new session, a client first sends
+ a <code>HandshakeRequest</code> containing just the hash of
+ its protocol and of the server's protocol
+ (<code>clientHash!=null, clientProtocol=null,
+ serverHash!=null</code>), where the hashes are 128-bit MD5
+ hashes of the JSON protocol text. If a client has never
+ connected to a given server, it sends its hash as a guess of
+ the server's hash, otherwise it sends the hash that it
+ previously obtained from this server.</li>
- <p>The version zero handshake consists of a request and
- response message.</p>
-
- <p>The format of a version zero handshake request is:</p>
- <ul>
- <li>a four-byte, big-endian <em>version</em> of zero,
- followed by</li>
- <li>the JSON <em>client's protocol</em> for the session,
- serialized as an Avro string.</li>
- </ul>
-
- <p>The format of a version zero handshake response is:</p>
- <ul>
- <li>a one-byte <em>error flag</em> boolean, followed by either:
+ <li>The server responds with
+ a <code>HandshakeResponse</code> containing one of:
<ul>
- <li>if the error flag is false, the JSON <em>server's
- protocol</em> for the session, serialized as an Avro
- string.</li>
- <li>if the error flag is true, an error message, as an
- Avro string.</li>
+ <li><code>match=BOTH, serverProtocol=null,
+ serverHash=null</code> if the client sent the valid hash
+ of the server's protocol and the server knows what
+ protocol corresponds to the client's hash. In this case,
+ the request is complete and the session is
+ established.</li>
+
+ <li><code>match=CLIENT, serverProtocol!=null,
+ serverHash!=null</code> if the server has previously
+ seen the client's protocol, but the client sent an
+ incorrect hash of the server's protocol. The client must
+ then re-send the request with the correct server
+ hash.</li>
+
+ <li><code>match=NONE, serverProtocol!=null,
+ serverHash!=null</code> if the server has not previously
+ seen the client's protocol and the client sent and
+ incorrect hash of the server's protocol.
+
+ <p>In this case The client must then re-submit its
+ request with its protocol text (<code>clientHash!=null,
+ clientProtocol!=null, serverHash!=null</code>) and the
+ server should respond with with a successful match
+ (<code>match=BOTH, serverProtocol=null,
+ serverHash=null</code>) as above.</p>
+ </li>
</ul>
</li>
</ul>
+ <p>Until a connection is established, call request data sent
+ by the client must be preceded by
+ a <code>HandshakeRequest</code> and call response data
+ returned by the server must be preceded by a
+ <code>HandshakeResponse</code>. A connection is not
+ established until a <code>HandshakeResponse</code> with
+ <code>match=BOTH</code> or <code>match=CLIENT</code> is
+ returned. In these cases, the call response data immmediately
+ follows
+ the <code>HandShakeResponse</code>. When <code>match=NONE</code>
+ no response call data is sent and the request call data is
+ ignored.</p>
+
+ <p>The <code>meta</code> field is reserved for future
+ handshake enhancements.</p>
+
</section>
<section>
Modified: hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java Tue Jun 2
19:02:36 2009
@@ -27,6 +27,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.security.MessageDigest;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
@@ -125,6 +126,7 @@
private Schema.Names types = new Schema.Names();
private Map<String,Message> messages = new LinkedHashMap<String,Message>();
+ private byte[] md5;
/** An error that can be thrown by any message. */
public static final Schema SYSTEM_ERROR = Schema.create(Schema.Type.STRING);
@@ -198,6 +200,18 @@
return buffer.toString();
}
+ /** Return the MD5 hash of the text of this protocol. */
+ public byte[] getMD5() {
+ if (md5 == null)
+ try {
+ md5 = MessageDigest.getInstance("MD5")
+ .digest(this.toString().getBytes("UTF-8"));
+ } catch (Exception e) {
+ throw new AvroRuntimeException(e);
+ }
+ return md5;
+ }
+
/** Read a protocol from a Json file. */
public static Protocol parse(File file) throws IOException {
return parse(Schema.FACTORY.createJsonParser(file));
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java Tue Jun
2 19:02:36 2009
@@ -104,7 +104,7 @@
public Fixed(byte[] bytes) { bytes(bytes); }
protected Fixed() {}
- protected void bytes(byte[] bytes) { this.bytes = bytes; }
+ public void bytes(byte[] bytes) { this.bytes = bytes; }
public byte[] bytes() { return bytes; }
@@ -116,6 +116,7 @@
public int hashCode() { return Arrays.hashCode(bytes); }
+ public String toString() { return Arrays.toString(bytes); }
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Tue Jun 2
19:02:36 2009
@@ -27,6 +27,7 @@
import org.apache.avro.Protocol.Message;
import org.apache.avro.util.*;
import org.apache.avro.io.*;
+import org.apache.avro.specific.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@
private Protocol local;
private Protocol remote;
+ private boolean established, sendLocalText;
private Transceiver transceiver;
public Protocol getLocal() { return local; }
@@ -47,38 +49,34 @@
throws IOException {
this.local = local;
this.transceiver = transceiver;
- this.remote = handshake();
- }
-
- private Protocol handshake() throws IOException {
- ByteBufferValueWriter out = new ByteBufferValueWriter();
- out.writeLong(Protocol.VERSION); // send handshake version
- out.writeUtf8(new Utf8(local.toString())); // send local protocol
- List<ByteBuffer> response = transceiver.transceive(out.getBufferList());
- ValueReader in = new ByteBufferValueReader(response);
- String remote = in.readUtf8(null).toString(); // read remote protocol
- return Protocol.parse(remote); // parse & return it
}
/** Writes a request message and reads a response or error message. */
public Object request(String messageName, Object request)
throws IOException {
-
- // use local protocol to write request
- Message m = getLocal().getMessages().get(messageName);
- if (m == null)
- throw new AvroRuntimeException("Not a local message: "+messageName);
-
- ByteBufferValueWriter out = new ByteBufferValueWriter();
-
- out.writeUtf8(new Utf8(m.getName())); // write message name
-
- writeRequest(m.getRequest(), request, out);
-
- List<ByteBuffer> response =
- getTransceiver().transceive(out.getBufferList());
-
- ValueReader in = new ByteBufferValueReader(response);
+ ValueReader in;
+ Message m;
+ do {
+ ByteBufferValueWriter out = new ByteBufferValueWriter();
+
+ if (!established) // if not established
+ writeHandshake(out); // prepend handshake
+
+ // use local protocol to write request
+ m = getLocal().getMessages().get(messageName);
+ if (m == null)
+ throw new AvroRuntimeException("Not a local message: "+messageName);
+
+ out.writeUtf8(new Utf8(m.getName())); // write message name
+ writeRequest(m.getRequest(), request, out); // write request payload
+
+ List<ByteBuffer> response = // transceive
+ getTransceiver().transceive(out.getBufferList());
+
+ in = new ByteBufferValueReader(response);
+ if (!established) // if not established
+ readHandshake(in); // process handshake
+ } while (!established);
// use remote protocol to read response
m = getRemote().getMessages().get(messageName);
@@ -91,6 +89,65 @@
}
}
+ private static final Map<String,MD5> REMOTE_HASHES =
+ Collections.synchronizedMap(new HashMap<String,MD5>());
+ private static final Map<MD5,Protocol> REMOTE_PROTOCOLS =
+ Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+
+ private static final SpecificDatumWriter HANDSHAKE_WRITER =
+ new SpecificDatumWriter(HandshakeRequest._SCHEMA);
+
+ private static final SpecificDatumReader HANDSHAKE_READER =
+ new SpecificDatumReader(HandshakeResponse._SCHEMA);
+
+ private void writeHandshake(ValueWriter out) throws IOException {
+ MD5 localHash = new MD5();
+ localHash.bytes(local.getMD5());
+ String remoteName = transceiver.getRemoteName();
+ MD5 remoteHash = REMOTE_HASHES.get(remoteName);
+ remote = REMOTE_PROTOCOLS.get(remoteHash);
+ if (remoteHash == null) { // guess remote is local
+ remoteHash = localHash;
+ remote = local;
+ }
+ HandshakeRequest handshake = new HandshakeRequest();
+ handshake.clientHash = localHash;
+ handshake.serverHash = remoteHash;
+ if (sendLocalText)
+ handshake.clientProtocol = new Utf8(local.toString());
+ HANDSHAKE_WRITER.write(handshake, out);
+ }
+
+ private void readHandshake(ValueReader in) throws IOException {
+ HandshakeResponse handshake =
+ (HandshakeResponse)HANDSHAKE_READER.read(null, in);
+ switch (handshake.match) {
+ case BOTH:
+ established = true;
+ break;
+ case CLIENT:
+ LOG.debug("Handshake match = CLIENT");
+ setRemote(handshake);
+ established = true;
+ break;
+ case NONE:
+ LOG.debug("Handshake match = NONE");
+ setRemote(handshake);
+ sendLocalText = true;
+ break;
+ default:
+ throw new AvroRuntimeException("Unexpected match: "+handshake.match);
+ }
+ }
+
+ private void setRemote(HandshakeResponse handshake) {
+ remote = Protocol.parse(handshake.serverProtocol.toString());
+ MD5 remoteHash = (MD5)handshake.serverHash;
+ REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);
+ if (!REMOTE_PROTOCOLS.containsKey(remoteHash))
+ REMOTE_PROTOCOLS.put(remoteHash, remote);
+ }
+
/** Writes a request message. */
public abstract void writeRequest(Schema schema, Object request,
ValueWriter out) throws IOException;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Tue Jun 2
19:02:36 2009
@@ -29,47 +29,42 @@
import org.apache.avro.Protocol.Message;
import org.apache.avro.util.*;
import org.apache.avro.io.*;
+import org.apache.avro.specific.*;
/** Base class for the server side of a protocol interaction. */
public abstract class Responder {
private static final Logger LOG = LoggerFactory.getLogger(Responder.class);
+ private Map<Transceiver,Protocol> remotes
+ = Collections.synchronizedMap(new WeakHashMap<Transceiver,Protocol>());
+ private Map<MD5,Protocol> protocols
+ = Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+
private Protocol local;
+ private MD5 localHash;
protected Responder(Protocol local) {
this.local = local;
+ this.localHash = new MD5();
+ localHash.bytes(local.getMD5());
+ protocols.put(localHash, local);
}
public Protocol getLocal() { return local; }
- /** Returns the remote protocol. */
- protected Protocol handshake(Transceiver server) throws IOException {
- ValueReader in = new ByteBufferValueReader(server.readBuffers());
- ByteBufferValueWriter out = new ByteBufferValueWriter();
-
- long version = in.readLong(); // read handshake version
- if (version != Protocol.VERSION)
- throw new AvroRuntimeException("Incompatible request version: "+version);
- // read remote protocol
- Protocol remote = Protocol.parse(in.readUtf8(null).toString());
-
- out.writeUtf8(new Utf8(local.toString())); // write local protocol
- server.writeBuffers(out.getBufferList());
-
- return remote; // return remote protocol
- }
-
- /** Called by a server to deserialize requests, compute and serialize
- * responses and errors. */
- public List<ByteBuffer> respond(Protocol remote, List<ByteBuffer> input)
- throws IOException {
+ /** Called by a server to deserialize a request, compute and serialize
+ * a response or error. */
+ public List<ByteBuffer> respond(Transceiver transceiver) throws IOException {
+ ValueReader in = new ByteBufferValueReader(transceiver.readBuffers());
ByteBufferValueWriter out = new ByteBufferValueWriter();
AvroRemoteException error = null;
try {
+ Protocol remote = handshake(transceiver, in, out);
+ if (remote == null) // handshake failed
+ return out.getBufferList();
+
// read request using remote protocol specification
- ValueReader in = new ByteBufferValueReader(input);
String messageName = in.readUtf8(null).toString();
-
Message m = remote.getMessages().get(messageName);
if (m == null)
throw new AvroRuntimeException("No such remote message: "+messageName);
@@ -107,6 +102,40 @@
return out.getBufferList();
}
+ private SpecificDatumWriter handshakeWriter =
+ new SpecificDatumWriter(HandshakeResponse._SCHEMA);
+ private SpecificDatumReader handshakeReader =
+ new SpecificDatumReader(HandshakeRequest._SCHEMA);
+
+ private Protocol handshake(Transceiver transceiver,
+ ValueReader in, ValueWriter out)
+ throws IOException {
+ Protocol remote = remotes.get(transceiver);
+ if (remote != null) return remote; // already established
+
+ HandshakeRequest request = (HandshakeRequest)handshakeReader.read(null,
in);
+ remote = protocols.get(request.clientHash);
+ if (remote == null && request.clientProtocol != null) {
+ remote = Protocol.parse(request.clientProtocol.toString());
+ protocols.put(request.clientHash, remote);
+ }
+ if (remote != null)
+ remotes.put(transceiver, remote);
+ HandshakeResponse response = new HandshakeResponse();
+ if (localHash.equals(request.serverHash)) {
+ response.match =
+ remote == null ? HandshakeMatch.NONE : HandshakeMatch.BOTH;
+ } else {
+ response.match =
+ remote == null ? HandshakeMatch.NONE : HandshakeMatch.CLIENT;
+ }
+ if (response.match != HandshakeMatch.BOTH) {
+ response.serverProtocol = new Utf8(local.toString());
+ response.serverHash = localHash;
+ }
+ handshakeWriter.write(response, out);
+ return remote;
+ }
/** Computes the response for a message. */
public abstract Object respond(Message message, Object request)
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java Tue Jun 2
19:02:36 2009
@@ -23,7 +23,7 @@
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -38,38 +38,43 @@
private Responder responder;
private ServerSocketChannel channel;
+ private ThreadGroup group;
public SocketServer(Responder responder, SocketAddress addr)
throws IOException {
- this.responder = responder;
+ String name = "SocketServer on "+addr;
+ this.responder = responder;
+ this.group = new ThreadGroup(name);
this.channel = ServerSocketChannel.open();
+
channel.socket().bind(addr);
- setName("SocketServer on "+addr);
+ setName(name);
setDaemon(true);
- LOG.info("starting on "+addr);
start();
}
public int getPort() { return channel.socket().getLocalPort(); }
public void run() {
+ LOG.info("starting "+channel.socket().getInetAddress());
while (true) {
try {
- LOG.info("listening on "+channel.socket().getInetAddress());
new Connection(channel.accept());
- } catch (ClosedByInterruptException e) {
+ } catch (ClosedChannelException e) {
return;
} catch (IOException e) {
LOG.warn("unexpected error", e);
throw new RuntimeException(e);
+ } finally {
+ LOG.info("stopping "+channel.socket().getInetAddress());
}
}
}
public void close() {
- interrupt();
+ group.interrupt();
}
private class Connection extends SocketTransceiver implements Runnable {
@@ -77,37 +82,28 @@
public Connection(SocketChannel channel) {
super(channel);
- Thread thread = new Thread(this);
- LOG.info("connection from "+channel.socket().getRemoteSocketAddress());
- thread.setName("Connection for "+channel);
+ Thread thread = new Thread(group, this);
+ thread.setName("Connection to
"+channel.socket().getRemoteSocketAddress());
thread.setDaemon(true);
thread.start();
}
public void run() {
try {
- Protocol remote = responder.handshake(this);
- while (true) {
- writeBuffers(responder.respond(remote, readBuffers()));
+ try {
+ while (true) {
+ writeBuffers(responder.respond(this));
+ }
+ } catch (ClosedChannelException e) {
+ return;
+ } finally {
+ close();
}
- } catch (ClosedByInterruptException e) {
- return;
} catch (IOException e) {
LOG.warn("unexpected error", e);
- throw new RuntimeException(e);
- } finally {
- try {
- super.close();
- } catch (IOException e) {
- LOG.warn("unexpected error", e);
- throw new RuntimeException(e);
- }
}
}
- public void close() {
- interrupt();
- }
}
public static void main(String arg[]) throws Exception {
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java Tue
Jun 2 19:02:36 2009
@@ -35,12 +35,17 @@
private SocketChannel channel;
private ByteBuffer header = ByteBuffer.allocate(4);
+ public String getRemoteName() {
+ return channel.socket().getRemoteSocketAddress().toString();
+ }
+
public SocketTransceiver(SocketAddress address) throws IOException {
this(SocketChannel.open(address));
}
public SocketTransceiver(SocketChannel channel) {
this.channel = channel;
+ LOG.info("open to "+channel.socket().getRemoteSocketAddress());
}
public synchronized List<ByteBuffer> readBuffers() throws IOException {
@@ -81,6 +86,7 @@
}
public void close() throws IOException {
+ LOG.info("closing to "+channel.socket().getRemoteSocketAddress());
channel.close();
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Transceiver.java Tue Jun 2
19:02:36 2009
@@ -25,6 +25,8 @@
/** Base class for transmitters and recievers of raw binary messages. */
public abstract class Transceiver {
+ public abstract String getRemoteName();
+
public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
throws IOException {
writeBuffers(request);
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
Tue Jun 2 19:02:36 2009
@@ -20,6 +20,8 @@
import java.io.File;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
@@ -34,6 +36,7 @@
private String namespace;
private StringBuilder buffer = new StringBuilder();
+ private Set<String> compiledTypes = new HashSet<String>();
private SpecificCompiler() {} // no public ctor
@@ -131,6 +134,7 @@
private void compile(Schema schema, String name, int d) {
String type = type(schema, name);
+ if (compiledTypes.contains(type)) return; else compiledTypes.add(type);
switch (schema.getType()) {
case RECORD:
buffer.append("\n");
@@ -141,7 +145,7 @@
: " extends SpecificRecordBase")
+" implements SpecificRecord {");
// schema definition
- line(d+1, "private static final Schema _SCHEMA = Schema.parse(\""
+ line(d+1, "public static final Schema _SCHEMA = Schema.parse(\""
+esc(schema)+"\");");
// field declations
for (Map.Entry<String, Schema> field : schema.getFieldSchemas()) {
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
---
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
(original)
+++
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
Tue Jun 2 19:02:36 2009
@@ -39,6 +39,10 @@
super(root, packageName);
}
+ public SpecificDatumReader(Schema root) {
+ super(root, root.getNamespace()+".");
+ }
+
protected void addField(Object record, String name, int position, Object o) {
((SpecificRecord)record).set(position, o);
}
Added: hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4?rev=781125&view=auto
==============================================================================
--- hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4
(added)
+++ hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeRequest.m4 Tue
Jun 2 19:02:36 2009
@@ -0,0 +1,11 @@
+{
+ "type": "record",
+ "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
+ "fields": [
+ {"name": "clientHash",
+ "type": include(`org/apache/avro/ipc/MD5.js')},
+ {"name": "clientProtocol", "type": ["null", "string"]},
+ {"name": "serverHash", "type": "MD5"},
+ {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
+ ]
+}
Added: hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4?rev=781125&view=auto
==============================================================================
--- hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4
(added)
+++ hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/HandshakeResponse.m4 Tue
Jun 2 19:02:36 2009
@@ -0,0 +1,15 @@
+{
+ "type": "record",
+ "name": "HandshakeResponse", "namespace": "org.apache.avro.ipc",
+ "fields": [
+ {"name": "match",
+ "type": {"type": "enum", "name": "HandshakeMatch",
+ "symbols": ["BOTH", "CLIENT", "NONE"]}},
+ {"name": "serverProtocol",
+ "type": ["null", "string"]},
+ {"name": "serverHash",
+ "type": ["null", include(`org/apache/avro/ipc/MD5.js')]},
+ {"name": "meta",
+ "type": ["null", {"type": "map", "values": "bytes"}]}
+ ]
+}
Added: hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js?rev=781125&view=auto
==============================================================================
--- hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js (added)
+++ hadoop/avro/trunk/src/schemata/org/apache/avro/ipc/MD5.js Tue Jun 2
19:02:36 2009
@@ -0,0 +1 @@
+{"type": "fixed", "name": "MD5", "size": 16}
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestFsData.java Tue Jun 2
19:02:36 2009
@@ -26,8 +26,8 @@
import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
@@ -47,7 +47,7 @@
Integer.parseInt(System.getProperty("test.count", "10"));
private static final int BUFFER_SIZE = 64 * 1024;
private static final File DIR=new File(System.getProperty("test.dir",
"/tmp"));
- private static final File FILE = new File("src/test/schemata/fs-data.js");
+ private static final File FILE = new File("src/test/schemata/FSData.avpr");
private static final Protocol PROTOCOL;
static {
try {
@@ -95,7 +95,7 @@
private static Requestor requestor;
private static FileChannel fileChannel;
- @BeforeMethod
+ @BeforeClass
public void testStartServer() throws Exception {
// create a file that has COUNT * BUFFER_SIZE bytes of random data
Random rand = new Random();
@@ -132,7 +132,7 @@
}
}
- @AfterMethod
+ @AfterClass
public void testStopServer() throws Exception {
server.close();
fileChannel.close();
Modified:
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
Tue Jun 2 19:02:36 2009
@@ -18,6 +18,7 @@
package org.apache.avro;
import org.apache.avro.Protocol.Message;
+import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRequestor;
@@ -28,8 +29,8 @@
import org.slf4j.LoggerFactory;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
@@ -38,12 +39,14 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Random;
+import java.util.LinkedHashMap;
+import java.util.ArrayList;
public class TestProtocolGeneric {
private static final Logger LOG
= LoggerFactory.getLogger(TestProtocolGeneric.class);
- private static final File FILE = new File("src/test/schemata/simple.js");
+ private static final File FILE = new File("src/test/schemata/simple.avpr");
private static final Protocol PROTOCOL;
static {
try {
@@ -93,7 +96,7 @@
private static Transceiver client;
private static Requestor requestor;
- @BeforeMethod
+ @BeforeClass
public void testStartServer() throws Exception {
server = new SocketServer(new TestResponder(), new InetSocketAddress(0));
client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
@@ -152,8 +155,39 @@
assertEquals("an error",
((Map)error.getValue()).get("message").toString());
}
- @AfterMethod
- public void testStopServer() {
+ @Test
+ /** Construct and use a different protocol whose "hello" method has an extra
+ argument to check that schema is sent to parse request. */
+ public void testHandshake() throws IOException {
+ Protocol protocol = new Protocol("Simple", "org.apache.avro.test");
+ LinkedHashMap<String,Field> fields = new LinkedHashMap<String,Field>();
+ fields.put("extra",
+ new Schema.Field(Schema.create(Schema.Type.BOOLEAN), null));
+ fields.put("greeting",
+ new Schema.Field(Schema.create(Schema.Type.STRING), null));
+ Protocol.Message message =
+ protocol.createMessage("hello",
+ Schema.createRecord(fields),
+ Schema.create(Schema.Type.STRING),
+ Schema.createUnion(new ArrayList<Schema>()));
+ protocol.getMessages().put("hello", message);
+ Transceiver t
+ = new SocketTransceiver(new InetSocketAddress(server.getPort()));
+ try {
+ Requestor r = new GenericRequestor(protocol, t);
+ GenericRecord params = new GenericData.Record(message.getRequest());
+ params.put("extra", Boolean.TRUE);
+ params.put("greeting", new Utf8("bob"));
+ Utf8 response = (Utf8)r.request("hello", params);
+ assertEquals(new Utf8("goodbye"), response);
+ } finally {
+ t.close();
+ }
+ }
+
+ @AfterClass
+ public void testStopServer() throws IOException {
+ client.close();
server.close();
}
}
Modified:
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflect.java
Tue Jun 2 19:02:36 2009
@@ -22,13 +22,13 @@
import org.apache.avro.reflect.ReflectRequestor;
import org.apache.avro.reflect.ReflectResponder;
import org.apache.avro.test.Simple;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
import java.net.InetSocketAddress;
public class TestProtocolReflect extends TestProtocolSpecific {
- @BeforeMethod
+ @BeforeClass
public void testStartServer() throws Exception {
server = new SocketServer(new ReflectResponder(Simple.class, new
TestImpl()),
new InetSocketAddress(0));
Modified:
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
Tue Jun 2 19:02:36 2009
@@ -33,8 +33,8 @@
import org.slf4j.LoggerFactory;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.*;
@@ -50,16 +50,6 @@
private static final File SERVER_PORTS_DIR
= new File(System.getProperty("test.dir", "/tmp")+"/server-ports/");
- private static final File FILE = new File("src/test/schemata/simple.js");
- private static final Protocol PROTOCOL;
- static {
- try {
- PROTOCOL = Protocol.parse(FILE);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
public static class TestImpl implements Simple {
public Utf8 hello(Utf8 greeting) { return new Utf8("goodbye"); }
public TestRecord echo(TestRecord record) { return record; }
@@ -75,7 +65,7 @@
protected static Transceiver client;
protected static Simple proxy;
- @BeforeMethod
+ @BeforeClass
public void testStartServer() throws Exception {
server = new SocketServer(new SpecificResponder(Simple.class, new
TestImpl()),
new InetSocketAddress(0));
@@ -133,8 +123,9 @@
assertEquals("an error", error.message.toString());
}
- @AfterMethod
- public void testStopServer() {
+ @AfterClass
+ public void testStopServer() throws IOException {
+ client.close();
server.close();
}
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestReflect.java Tue Jun 2
19:02:36 2009
@@ -32,7 +32,7 @@
private static final Logger LOG
= LoggerFactory.getLogger(TestProtocolSpecific.class);
- private static final File FILE = new File("src/test/schemata/simple.js");
+ private static final File FILE = new File("src/test/schemata/simple.avpr");
private static final Protocol PROTOCOL;
static {
try {
Modified: hadoop/avro/trunk/src/test/py/interoptests.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/interoptests.py?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/interoptests.py (original)
+++ hadoop/avro/trunk/src/test/py/interoptests.py Tue Jun 2 19:02:36 2009
@@ -34,7 +34,7 @@
self.__datumreader = datumreader
def testreadfiles(self):
- origschm = schema.parse(open("src/test/schemata/interop.js").read())
+ origschm = schema.parse(open("src/test/schemata/interop.avsc").read())
for file in os.listdir(_DATAFILE_DIR):
print "Validating:", file.__str__()
dr = io.DataFileReader(open(_DATAFILE_DIR+file, "rb"),
@@ -86,4 +86,4 @@
elif sys.argv[1] == "server":
_interopserver()
else:
- print usage
\ No newline at end of file
+ print usage
Modified: hadoop/avro/trunk/src/test/py/testipc.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipc.py?rev=781125&r1=781124&r2=781125&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipc.py (original)
+++ hadoop/avro/trunk/src/test/py/testipc.py Tue Jun 2 19:02:36 2009
@@ -21,7 +21,7 @@
import avro.protocol as protocol
import avro.schema as schema
-PROTOCOL = protocol.parse(open("src/test/schemata/simple.js").read())
+PROTOCOL = protocol.parse(open("src/test/schemata/simple.avpr").read())
class TestProtocol(unittest.TestCase):
Propchange: hadoop/avro/trunk/src/test/schemata/FSData.avpr
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/avro/trunk/src/test/schemata/interop.avsc
------------------------------------------------------------------------------
svn:mergeinfo =