Author: cutting
Date: Wed Sep 30 20:52:22 2009
New Revision: 820442
URL: http://svn.apache.org/viewvc?rev=820442&view=rev
Log:
AVRO-129. Add HTTP-based RPC client and server.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/ivy.xml
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=820442&r1=820441&r2=820442&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Sep 30 20:52:22 2009
@@ -33,6 +33,8 @@
written with a different version of the schema than is current.
(cutting)
+ AVRO-129. Add HTTP-based RPC client and server. (cutting)
+
IMPROVEMENTS
AVRO-99. Use Boost framework for C++ unit tests.
Modified: hadoop/avro/trunk/ivy.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/ivy.xml?rev=820442&r1=820441&r2=820442&view=diff
==============================================================================
--- hadoop/avro/trunk/ivy.xml (original)
+++ hadoop/avro/trunk/ivy.xml Wed Sep 30 20:52:22 2009
@@ -39,6 +39,8 @@
rev="1.5"/>
<dependency org="com.thoughtworks.paranamer" name="paranamer-ant"
rev="1.5"/>
+ <dependency org="org.mortbay.jetty" name="jetty"
+ rev="6.1.14"/>
<dependency org="junit" name="junit" rev="4.5" conf="test->default"/>
<dependency org="checkstyle" name="checkstyle" rev="5.0"
conf="test->default"/>
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java?rev=820442&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java Wed Sep 30
20:52:22 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.avro.AvroRuntimeException;
+
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class HttpServer extends HttpServlet implements Server {
+ private Responder responder;
+ private org.mortbay.jetty.Server server;
+
+ public HttpServer(Responder responder, int port) throws IOException {
+ this.responder = responder;
+ this.server = new org.mortbay.jetty.Server(port);
+ new Context(server,"/").addServlet(new ServletHolder(this), "/*");
+ try {
+ server.start();
+ } catch (Exception e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
+ @Override
+ public int getPort() { return server.getConnectors()[0].getLocalPort(); }
+
+ @Override
+ public void close() {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
+ public void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ response.setContentType("avro/binary");
+ List<ByteBuffer> requestBuffers =
+ HttpTransceiver.readBuffers(request.getInputStream());
+ try {
+ List<ByteBuffer> responseBuffers =
+ responder.respond(requestBuffers);
+ response.setContentLength(HttpTransceiver.getLength(responseBuffers));
+ HttpTransceiver.writeBuffers(responseBuffers,
response.getOutputStream());
+ } catch (AvroRuntimeException e) {
+ throw new ServletException(e);
+ }
+ }
+}
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java?rev=820442&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java Wed Sep
30 20:52:22 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.net.URL;
+import java.net.URLConnection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An HTTP-based {...@link Transceiver} implementation. */
+public class HttpTransceiver extends Transceiver {
+ private static final Logger LOG
+ = LoggerFactory.getLogger(HttpTransceiver.class);
+
+ private URL url;
+ private URLConnection connection;
+
+ public HttpTransceiver(URL url) { this.url = url; }
+
+ public String getRemoteName() { return this.url.toString(); }
+
+ @Override
+ public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
+ throws IOException {
+ this.connection = url.openConnection();
+ connection.setRequestProperty("Content-Type", "avro/binary");
+ connection.setRequestProperty("Content-Length",
+ Integer.toString(getLength(request)));
+ connection.setDoOutput(true);
+ LOG.info("Connecting to: "+url);
+ return super.transceive(request);
+ }
+
+ public synchronized List<ByteBuffer> readBuffers() throws IOException {
+ return readBuffers(connection.getInputStream());
+ }
+
+ public synchronized void writeBuffers(List<ByteBuffer> buffers)
+ throws IOException {
+ writeBuffers(buffers, connection.getOutputStream());
+ }
+
+ static int getLength(List<ByteBuffer> buffers) {
+ int length = 0;
+ for (ByteBuffer buffer : buffers) {
+ length += 4;
+ length += buffer.remaining();
+ }
+ length += 4;
+ return length;
+ }
+
+ static List<ByteBuffer> readBuffers(InputStream in)
+ throws IOException {
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ while (true) {
+ int length = (in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
+ if (length == 0) { // end of buffers
+ return buffers;
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(length);
+ while (buffer.hasRemaining()) {
+ int p = buffer.position();
+ int i = in.read(buffer.array(), p, buffer.remaining());
+ if (i < 0)
+ throw new EOFException("Unexpected EOF");
+ buffer.position(p+i);
+ }
+ buffer.flip();
+ buffers.add(buffer);
+ }
+ }
+
+ static void writeBuffers(List<ByteBuffer> buffers, OutputStream out)
+ throws IOException {
+ for (ByteBuffer buffer : buffers) {
+ writeLength(buffer.limit(), out); // length-prefix
+ out.write(buffer.array(), buffer.position(), buffer.remaining());
+ buffer.position(buffer.limit());
+ }
+ writeLength(0, out); // null-terminate
+ }
+
+ private static void writeLength(int length, OutputStream out)
+ throws IOException {
+ out.write(0xff & (length >>> 24));
+ out.write(0xff & (length >>> 16));
+ out.write(0xff & (length >>> 8));
+ out.write(0xff & length);
+ }
+}
+
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java?rev=820442&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java
(added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java Wed
Sep 30 20:52:22 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.util.Random;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.junit.Before;
+
+import java.net.URL;
+
+public class TestProtocolHttp extends TestProtocolSpecific {
+
+ @Before
+ public void testStartServer() throws Exception {
+ server =
+ new HttpServer(new SpecificResponder(Simple.class, new TestImpl()), 0);
+ client =
+ new HttpTransceiver(new URL("http://127.0.0.1:"+server.getPort()+"/"));
+ proxy = (Simple)SpecificRequestor.getClient(Simple.class, client);
+ }
+
+}