Author: cutting
Date: Wed Sep 23 22:34:14 2009
New Revision: 818293
URL: http://svn.apache.org/viewvc?rev=818293&view=rev
Log:
AVRO-115. Remove RPC sessions.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
hadoop/avro/trunk/src/py/avro/ipc.py
hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Sep 23 22:34:14 2009
@@ -4,6 +4,10 @@
INCOMPATIBLE CHANGES
+ AVRO-115. Remove RPC's session notion to facilliate the use of
+ stateless transports like UDP and HTTP. Add a UDP transport.
+ (cutting)
+
NEW FEATURES
IMPROVEMENTS
Modified: hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/doc/content/xdocs/spec.xml?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/doc/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/src/doc/content/xdocs/spec.xml Wed Sep 23 22:34:14 2009
@@ -687,15 +687,7 @@
<p>A transport is a system that supports:</p>
<ul>
- <li><strong>session creation</strong>
- <p>A session forms the context under which multiple
- messages may be transcieved. A client must establish a
- session with a server before any requests may be
- processed.</p>
- </li>
<li><strong>transmission of request messages</strong>
- <p>Once a session has been established, clients may send
- servers request messages using that session.</p>
</li>
<li><strong>receipt of corresponding response messages</strong>
<p>Servers will send a response message back to the client
@@ -752,14 +744,15 @@
<section>
<title>Handshake</title>
- <p>RPC sessions are initiated by handshake. The purpose of
- the handshake is to ensure that the client and the server have
- each other's protocol definition, so that the client can
- correctly deserialize responses, and the server can correctly
- deserialize requests. Both clients and servers should
- maintain a cache of recently seen protocols, so that, in most
- cases, a handshake will be completed without extra round-trip
- network exchanges or the transmission of full protocol text.</p>
+ <p>RPC requests and responses are prefixed by handshakes. The
+ purpose of the handshake is to ensure that the client and the
+ server have each other's protocol definition, so that the
+ client can correctly deserialize responses, and the server can
+ correctly deserialize requests. Both clients and servers
+ should maintain a cache of recently seen protocols, so that,
+ in most cases, a handshake will be completed without extra
+ round-trip network exchanges or the transmission of full
+ protocol text.</p>
<p>The handshake process uses the following record schemas:</p>
@@ -793,7 +786,7 @@
</source>
<ul>
- <li>In a new session, a client first sends
+ <li>A client first prefixes each request with
a <code>HandshakeRequest</code> containing just the hash of
its protocol and of the server's protocol
(<code>clientHash!=null, clientProtocol=null,
@@ -810,19 +803,22 @@
serverHash=null</code> if the client sent the valid hash
of the server's protocol and the server knows what
protocol corresponds to the client's hash. In this case,
- the request is complete and the session is
- established.</li>
+ the request is complete and the response data
+ immediately follows the HandshakeResponse.</li>
<li><code>match=CLIENT, serverProtocol!=null,
serverHash!=null</code> if the server has previously
seen the client's protocol, but the client sent an
- incorrect hash of the server's protocol. The client must
- then re-send the request with the correct server
- hash.</li>
+ incorrect hash of the server's protocol. The request is
+ complete and the response data immediately follows the
+ HandshakeResponse. The client must use the returned
+ protocol to process the response and should also cache
+ that protocol and its hash for future interactions with
+ this server.</li>
<li><code>match=NONE, serverProtocol!=null,
serverHash!=null</code> if the server has not previously
- seen the client's protocol and the client sent and
+ seen the client's protocol and the client sent an
incorrect hash of the server's protocol.
<p>In this case The client must then re-submit its
@@ -836,19 +832,6 @@
</li>
</ul>
- <p>Until a connection is established, call request data sent
- by the client must be preceded by
- a <code>HandshakeRequest</code> and call response data
- returned by the server must be preceded by a
- <code>HandshakeResponse</code>. A connection is not
- established until a <code>HandshakeResponse</code> with
- <code>match=BOTH</code> or <code>match=CLIENT</code> is
- returned. In these cases, the call response data immmediately
- follows
- the <code>HandShakeResponse</code>. When <code>match=NONE</code>
- no response call data is sent and the request call data is
- ignored.</p>
-
<p>The <code>meta</code> field is reserved for future
handshake enhancements.</p>
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java?rev=818293&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java Wed Sep
23 22:34:14 2009
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A simple datagram-based server implementation. */
+public class DatagramServer extends Thread implements Server {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DatagramServer.class);
+
+ private final Responder responder;
+ private final DatagramChannel channel;
+ private final Transceiver transceiver;
+
+ public DatagramServer(Responder responder, SocketAddress addr)
+ throws IOException {
+ String name = "DatagramServer on "+addr;
+
+ this.responder = responder;
+
+ this.channel = DatagramChannel.open();
+ channel.socket().bind(addr);
+
+ this.transceiver = new DatagramTransceiver(channel);
+
+ setName(name);
+ setDaemon(true);
+ start();
+ }
+
+ public int getPort() { return channel.socket().getLocalPort(); }
+
+ public void run() {
+ while (true) {
+ try {
+ transceiver.writeBuffers(responder.respond(transceiver.readBuffers()));
+ } catch (ClosedChannelException e) {
+ return;
+ } catch (IOException e) {
+ LOG.warn("unexpected error", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void close() { this.interrupt(); }
+
+ public static void main(String[] arg) throws Exception {
+ DatagramServer server = new DatagramServer(null, new InetSocketAddress(0));
+ System.out.println("started");
+ server.join();
+ }
+
+}
+
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java?rev=818293&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java
(added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java Wed
Sep 23 22:34:14 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A datagram-based {...@link Transceiver} implementation. */
+public class DatagramTransceiver extends Transceiver {
+ private static final Logger LOG
+ = LoggerFactory.getLogger(DatagramTransceiver.class);
+
+ private static final int MAX_SIZE = 16 * 1024;
+
+ private DatagramChannel channel;
+ private SocketAddress remote;
+ private ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
+
+ public String getRemoteName() { return remote.toString(); }
+
+ public DatagramTransceiver(SocketAddress remote) throws IOException {
+ this(DatagramChannel.open());
+ this.remote = remote;
+ }
+
+ public DatagramTransceiver(DatagramChannel channel) {
+ this.channel = channel;
+ }
+
+ public synchronized List<ByteBuffer> readBuffers() throws IOException {
+ buffer.clear();
+ remote = channel.receive(buffer);
+ LOG.info("received from "+remote);
+ buffer.flip();
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ while (true) {
+ int length = buffer.getInt();
+ if (length == 0) { // end of buffers
+ return buffers;
+ }
+ ByteBuffer chunk = buffer.slice(); // use data without copying
+ chunk.limit(length);
+ buffer.position(buffer.position()+length);
+ buffers.add(chunk);
+ }
+ }
+
+ public synchronized void writeBuffers(List<ByteBuffer> buffers)
+ throws IOException {
+ buffer.clear();
+ for (ByteBuffer b : buffers) {
+ buffer.putInt(b.remaining());
+ buffer.put(b); // copy data. sigh.
+ }
+ buffer.putInt(0);
+ buffer.flip();
+ channel.send(buffer, remote);
+ LOG.info("sent to "+remote);
+ }
+
+}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java Wed Sep 23
22:34:14 2009
@@ -27,49 +27,49 @@
* This class represents the context of an RPC call or RPC handshake.
* Designed to provide information to RPC plugin writers,
* this class encapsulates information about the rpc exchange,
- * including per-session and per-call metadata.
+ * including handshake and call metadata.
*
*/
public class RPCContext {
- protected Map<Utf8,ByteBuffer> requestSessionMeta, responseSessionMeta;
+ protected Map<Utf8,ByteBuffer> requestHandshakeMeta, responseHandshakeMeta;
protected Map<Utf8,ByteBuffer> requestCallMeta, responseCallMeta;
protected Object response;
protected AvroRemoteException error;
/**
- * This is an access method for the session state
+ * This is an access method for the handshake state
* provided by the client to the server.
- * @return a map representing session state from
+ * @return a map representing handshake state from
* the client to the server
*/
- public Map<Utf8,ByteBuffer> requestSessionMeta() {
- if (requestSessionMeta == null) {
- requestSessionMeta = new HashMap<Utf8,ByteBuffer>();
+ public Map<Utf8,ByteBuffer> requestHandshakeMeta() {
+ if (requestHandshakeMeta == null) {
+ requestHandshakeMeta = new HashMap<Utf8,ByteBuffer>();
}
- return requestSessionMeta;
+ return requestHandshakeMeta;
}
- void setRequestSessionMeta(Map<Utf8,ByteBuffer> newmeta) {
- requestSessionMeta = newmeta;
+ void setRequestHandshakeMeta(Map<Utf8,ByteBuffer> newmeta) {
+ requestHandshakeMeta = newmeta;
}
/**
- * This is an access method for the session state
+ * This is an access method for the handshake state
* provided by the server back to the client
- * @return a map representing session state from
+ * @return a map representing handshake state from
* the server to the client
*/
- public Map<Utf8,ByteBuffer> responseSessionMeta() {
- if (responseSessionMeta == null) {
- responseSessionMeta = new HashMap<Utf8,ByteBuffer>();
+ public Map<Utf8,ByteBuffer> responseHandshakeMeta() {
+ if (responseHandshakeMeta == null) {
+ responseHandshakeMeta = new HashMap<Utf8,ByteBuffer>();
}
- return responseSessionMeta;
+ return responseHandshakeMeta;
}
- void setResponseSessionMeta(Map<Utf8,ByteBuffer> newmeta) {
- responseSessionMeta = newmeta;
+ void setResponseHandshakeMeta(Map<Utf8,ByteBuffer> newmeta) {
+ responseHandshakeMeta = newmeta;
}
/**
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java Wed Sep 23
22:34:14 2009
@@ -27,20 +27,20 @@
/**
* Called on the client before the initial RPC handshake to
- * setup any per-session metadata for this plugin
- * @param context the per-sesion rpc context
+ * setup any handshake metadata for this plugin
+ * @param context the handshake rpc context
*/
public void clientStartConnect(RPCContext context) { }
/**
* Called on the server during the RPC handshake
- * @param context the per-sesion rpc context
+ * @param context the handshake rpc context
*/
public void serverConnecting(RPCContext context) { }
/**
* Called on the client after the initial RPC handshake
- * @param context the per-sesion rpc context
+ * @param context the handshake rpc context
*/
public void clientFinishConnect(RPCContext context) { }
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Wed Sep 23
22:34:14 2009
@@ -55,7 +55,7 @@
private Protocol local;
private Protocol remote;
- private boolean established, sendLocalText;
+ private boolean sendLocalText;
private Transceiver transceiver;
protected List<RPCPlugin> rpcMetaPlugins;
@@ -91,8 +91,7 @@
ByteBufferOutputStream bbo = new ByteBufferOutputStream();
Encoder out = new BinaryEncoder(bbo);
- if (!established) // if not established
- writeHandshake(out); // prepend handshake
+ writeHandshake(out); // prepend handshake
// use local protocol to write request
m = getLocal().getMessages().get(messageName);
@@ -112,9 +111,7 @@
ByteBufferInputStream bbi = new ByteBufferInputStream(response);
in = new BinaryDecoder(bbi);
- if (!established) // if not established
- readHandshake(in); // process handshake
- } while (!established);
+ } while (!readHandshake(in));
// use remote protocol to read response
m = getRemote().getMessages().get(messageName);
@@ -172,13 +169,14 @@
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientStartConnect(context);
}
- handshake.meta = context.requestSessionMeta();
+ handshake.meta = context.requestHandshakeMeta();
HANDSHAKE_WRITER.write(handshake, out);
}
@SuppressWarnings("unchecked")
- private void readHandshake(Decoder in) throws IOException {
+ private boolean readHandshake(Decoder in) throws IOException {
+ boolean established = false;
HandshakeResponse handshake =
(HandshakeResponse)HANDSHAKE_READER.read(null, in);
switch (handshake.match) {
@@ -201,12 +199,13 @@
RPCContext context = new RPCContext();
if (handshake.meta != null) {
- context.setResponseSessionMeta((Map<Utf8, ByteBuffer>) handshake.meta);
+ context.setResponseHandshakeMeta((Map<Utf8, ByteBuffer>) handshake.meta);
}
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientFinishConnect(context);
}
+ return established;
}
private void setRemote(HandshakeResponse handshake) {
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Wed Sep 23
22:34:14 2009
@@ -26,7 +26,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.WeakHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,8 +55,6 @@
private static final GenericDatumWriter<Map<Utf8,ByteBuffer>> META_WRITER =
new GenericDatumWriter<Map<Utf8,ByteBuffer>>(META);
- private Map<Transceiver,Protocol> remotes
- = Collections.synchronizedMap(new WeakHashMap<Transceiver,Protocol>());
private Map<MD5,Protocol> protocols
= Collections.synchronizedMap(new HashMap<MD5,Protocol>());
@@ -87,18 +84,14 @@
/** Called by a server to deserialize a request, compute and serialize
* a response or error. */
- public List<ByteBuffer> respond(Transceiver transceiver) throws IOException {
- ByteBufferInputStream bbi =
- new ByteBufferInputStream(transceiver.readBuffers());
-
- Decoder in = new BinaryDecoder(bbi);
- ByteBufferOutputStream bbo =
- new ByteBufferOutputStream();
+ public List<ByteBuffer> respond(List<ByteBuffer> buffers) throws IOException
{
+ Decoder in = new BinaryDecoder(new ByteBufferInputStream(buffers));
+ ByteBufferOutputStream bbo = new ByteBufferOutputStream();
Encoder out = new BinaryEncoder(bbo);
AvroRemoteException error = null;
RPCContext context = new RPCContext();
try {
- Protocol remote = handshake(transceiver, in, out);
+ Protocol remote = handshake(in, out);
if (remote == null) // handshake failed
return bbo.getBufferList();
@@ -163,20 +156,14 @@
new SpecificDatumReader(HandshakeRequest._SCHEMA);
@SuppressWarnings("unchecked")
- private Protocol handshake(Transceiver transceiver,
- Decoder in, Encoder out)
+ private Protocol handshake(Decoder in, Encoder out)
throws IOException {
- Protocol remote = remotes.get(transceiver);
- if (remote != null) return remote; // already established
-
HandshakeRequest request = (HandshakeRequest)handshakeReader.read(null,
in);
- remote = protocols.get(request.clientHash);
+ Protocol remote = protocols.get(request.clientHash);
if (remote == null && request.clientProtocol != null) {
remote = Protocol.parse(request.clientProtocol.toString());
protocols.put(request.clientHash, remote);
}
- if (remote != null)
- remotes.put(transceiver, remote);
HandshakeResponse response = new HandshakeResponse();
if (localHash.equals(request.serverHash)) {
response.match =
@@ -191,12 +178,12 @@
}
RPCContext context = new RPCContext();
- context.setRequestSessionMeta((Map<Utf8, ByteBuffer>) request.meta);
+ context.setRequestHandshakeMeta((Map<Utf8, ByteBuffer>) request.meta);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.serverConnecting(context);
}
- response.meta = context.responseSessionMeta();
+ response.meta = context.responseHandshakeMeta();
handshakeWriter.write(response, out);
return remote;
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java?rev=818293&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java Wed Sep 23
22:34:14 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+/** A server listening on a port. */
+public interface Server {
+ /** The port this server runs on. */
+ int getPort();
+
+ /** Stop this server. */
+ void close();
+
+}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java Wed Sep 23
22:34:14 2009
@@ -29,7 +29,7 @@
import org.slf4j.LoggerFactory;
/** A simple socket-based server implementation. */
-public class SocketServer extends Thread {
+public class SocketServer extends Thread implements Server {
private static final Logger LOG =
LoggerFactory.getLogger(SocketServer.class);
private Responder responder;
@@ -88,7 +88,7 @@
try {
try {
while (true) {
- writeBuffers(responder.respond(this));
+ writeBuffers(responder.respond(readBuffers()));
}
} catch (ClosedChannelException e) {
return;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java Wed
Sep 23 22:34:14 2009
@@ -36,17 +36,17 @@
private SocketChannel channel;
private ByteBuffer header = ByteBuffer.allocate(4);
- public String getRemoteName() {
- return channel.socket().getRemoteSocketAddress().toString();
- }
-
public SocketTransceiver(SocketAddress address) throws IOException {
this(SocketChannel.open(address));
}
public SocketTransceiver(SocketChannel channel) {
this.channel = channel;
- LOG.info("open to "+channel.socket().getRemoteSocketAddress());
+ LOG.info("open to "+getRemoteName());
+ }
+
+ public String getRemoteName() {
+ return channel.socket().getRemoteSocketAddress().toString();
}
public synchronized List<ByteBuffer> readBuffers() throws IOException {
@@ -87,8 +87,10 @@
}
public void close() throws IOException {
- LOG.info("closing to "+channel.socket().getRemoteSocketAddress());
- channel.close();
+ if (channel.isOpen()) {
+ LOG.info("closing to "+getRemoteName());
+ channel.close();
+ }
}
}
Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Wed Sep 23 22:34:14 2009
@@ -90,7 +90,6 @@
def __init__(self, localproto, transceiver):
self.__localproto = localproto
self.__transceiver = transceiver
- self.__established = False
self.__sendlocaltext = False
self.__remoteproto = None
@@ -106,12 +105,10 @@
def request(self, msgname, req):
"""Writes a request message and reads a response or error message."""
processed = False
- while not self.__established or not processed:
- processed = True
+ while not processed:
buf = cStringIO.StringIO()
encoder = io.Encoder(buf)
- if not self.__established:
- self.__writehandshake(encoder)
+ self.__writehandshake(encoder)
requestmeta = dict()
_META_WRITER.write(requestmeta, encoder)
m = self.__localproto.getmessages().get(msgname)
@@ -121,8 +118,7 @@
self.writerequest(m.getrequest(), req, encoder)
response = self.__transceiver.transceive(buf.getvalue())
decoder = io.Decoder(cStringIO.StringIO(response))
- if not self.__established:
- self.__readhandshake(decoder)
+ processed = self.__readhandshake(decoder)
responsemeta = _META_READER.read(decoder)
m = self.getremote().getmessages().get(msgname)
if m is None:
@@ -154,13 +150,14 @@
self.__transceiver.getremotename().__str__() + " is " +
handshake.match.__str__())
if handshake.match == _HANDSHAKE_MATCH_BOTH:
- self.__established = True
+ return True
elif handshake.match == _HANDSHAKE_MATCH_CLIENT:
self.__setremote(handshake)
- self.__established = True
+ return True
elif handshake.match == _HANDSHAKE_MATCH_NONE:
self.__setremote(handshake)
self.__sendlocaltext = True
+ return False
else:
raise schema.AvroException("Unexpected match:
"+handshake.match.__str__())
@@ -188,7 +185,6 @@
def __init__(self, localproto):
self.__localproto = localproto
- self.__remotes = weakref.WeakKeyDictionary()
self.__protocols = dict()
self.__localhash = self.__localproto.getMD5()
self.__protocols[self.__localhash] = self.__localproto
@@ -248,16 +244,11 @@
def __handshake(self, transceiver, decoder, encoder):
- remoteproto = self.__remotes.get(transceiver)
- if remoteproto != None:
- return remoteproto #already established
request = _HANDSHAKE_RESPONDER_READER.read(decoder)
remoteproto = self.__protocols.get(request.clientHash)
if remoteproto is None and request.clientProtocol is not None:
remoteproto = protocol.parse(request.clientProtocol)
self.__protocols[request.clientHash] = remoteproto
- if remoteproto is not None:
- self.__remotes[transceiver] = remoteproto
response = _HandshakeResponse()
if self.__localhash == request.serverHash:
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java Wed
Sep 23 22:34:14 2009
@@ -27,13 +27,11 @@
import org.apache.avro.util.Utf8;
/**
- * An implementation of an RPC metadata plugin API
- * designed for unit testing. This plugin tests
- * both session and per-call state by passing
- * a string as per-call metadata, slowly building it
- * up at each instrumentation point, testing it as
- * it goes. Finally, after the call or handshake is
- * complete, the constructed string is tested.
+ * An implementation of an RPC metadata plugin API designed for unit testing.
+ * This plugin tests handshake and call state by passing a string as metadata,
+ * slowly building it up at each instrumentation point, testing it as it goes.
+ * Finally, after the call or handshake is complete, the constructed string is
+ * tested.
*/
public final class RPCMetaTestPlugin extends RPCPlugin {
@@ -46,18 +44,18 @@
@Override
public void clientStartConnect(RPCContext context) {
ByteBuffer buf = ByteBuffer.wrap("ap".getBytes());
- context.requestSessionMeta().put(key, buf);
+ context.requestHandshakeMeta().put(key, buf);
}
@Override
public void serverConnecting(RPCContext context) {
- Assert.assertNotNull(context.requestSessionMeta());
- Assert.assertNotNull(context.responseSessionMeta());
+ Assert.assertNotNull(context.requestHandshakeMeta());
+ Assert.assertNotNull(context.responseHandshakeMeta());
- if (!context.requestSessionMeta().containsKey(key)) return;
+ if (!context.requestHandshakeMeta().containsKey(key)) return;
- ByteBuffer buf = context.requestSessionMeta().get(key);
+ ByteBuffer buf = context.requestHandshakeMeta().get(key);
Assert.assertNotNull(buf);
Assert.assertNotNull(buf.array());
@@ -67,18 +65,18 @@
buf = ByteBuffer.wrap((partialstr + "ac").getBytes());
Assert.assertTrue(buf.remaining() > 0);
- context.responseSessionMeta().put(key, buf);
+ context.responseHandshakeMeta().put(key, buf);
}
@Override
public void clientFinishConnect(RPCContext context) {
- Map<Utf8,ByteBuffer> sessionMeta = context.responseSessionMeta();
+ Map<Utf8,ByteBuffer> handshakeMeta = context.responseHandshakeMeta();
- Assert.assertNotNull(sessionMeta);
+ Assert.assertNotNull(handshakeMeta);
- if (!sessionMeta.containsKey(key)) return;
+ if (!handshakeMeta.containsKey(key)) return;
- ByteBuffer buf = sessionMeta.get(key);
+ ByteBuffer buf = handshakeMeta.get(key);
Assert.assertNotNull(buf);
Assert.assertNotNull(buf.array());
@@ -88,9 +86,9 @@
buf = ByteBuffer.wrap((partialstr + "he").getBytes());
Assert.assertTrue(buf.remaining() > 0);
- sessionMeta.put(key, buf);
+ handshakeMeta.put(key, buf);
- checkRPCMetaMap(sessionMeta);
+ checkRPCMetaMap(handshakeMeta);
}
@Override
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java?rev=818293&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java
(added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java
Wed Sep 23 22:34:14 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.util.Random;
+import org.apache.avro.ipc.DatagramServer;
+import org.apache.avro.ipc.DatagramTransceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.junit.Before;
+
+import java.net.InetSocketAddress;
+
+public class TestProtocolDatagram extends TestProtocolSpecific {
+
+ @Before
+ public void testStartServer() throws Exception {
+ server =
+ new DatagramServer(new SpecificResponder(Simple.class, new TestImpl()),
+ new InetSocketAddress("localhost",
+ new
Random().nextInt(10000)+10000));
+ client = new DatagramTransceiver(new InetSocketAddress("localhost",
server.getPort()));
+ proxy = (Simple)SpecificRequestor.getClient(Simple.class, client);
+ }
+
+}
Modified:
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
Wed Sep 23 22:34:14 2009
@@ -18,6 +18,7 @@
package org.apache.avro;
import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.SocketServer;
import org.apache.avro.ipc.SocketTransceiver;
import org.apache.avro.ipc.Transceiver;
@@ -61,7 +62,7 @@
}
}
- protected static SocketServer server;
+ protected static Server server;
protected static Transceiver client;
protected static Simple proxy;