Author: cutting
Date: Fri Sep 18 17:47:08 2009
New Revision: 816727
URL: http://svn.apache.org/viewvc?rev=816727&view=rev
Log:
HADOOP-6170. Add facility to tunnel Avro RPCs through Hadoop RPCs.
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpc.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/build.xml
hadoop/common/trunk/ivy.xml
hadoop/common/trunk/ivy/libraries.properties
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=816727&r1=816726&r2=816727&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Fri Sep 18 17:47:08 2009
@@ -196,6 +196,10 @@
HADOOP-4952. Add new improved file system interface FileContext for the
application writer (Sanjay Radia via suresh)
+ HADOOP-6170. Add facility to tunnel Avro RPCs through Hadoop RPCs.
+ This permits one to take advantage of both Avro's RPC versioning
+ features and Hadoop's proven RPC scalability. (cutting)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
Modified: hadoop/common/trunk/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/build.xml?rev=816727&r1=816726&r2=816727&view=diff
==============================================================================
--- hadoop/common/trunk/build.xml (original)
+++ hadoop/common/trunk/build.xml Fri Sep 18 17:47:08 2009
@@ -460,6 +460,14 @@
<classpath refid="test.core.classpath"/>
</javac>
+ <taskdef
+ name="paranamer"
+ classname="com.thoughtworks.paranamer.ant.ParanamerGeneratorTask">
+ <classpath refid="classpath" />
+ </taskdef>
+ <paranamer sourceDirectory="${test.src.dir}/core"
+ outputDirectory="${test.core.build.classes}"/>
+
<delete dir="${test.cache.data}"/>
<mkdir dir="${test.cache.data}"/>
<copy file="${test.src.dir}/core/org/apache/hadoop/cli/testConf.xml"
todir="${test.cache.data}"/>
Modified: hadoop/common/trunk/ivy.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/ivy.xml?rev=816727&r1=816726&r2=816727&view=diff
==============================================================================
--- hadoop/common/trunk/ivy.xml (original)
+++ hadoop/common/trunk/ivy.xml Fri Sep 18 17:47:08 2009
@@ -271,15 +271,19 @@
</dependency>
<dependency org="org.apache.hadoop"
name="avro"
- rev="1.0.0"
+ rev="${avro.version}"
conf="common->default"/>
<dependency org="org.codehaus.jackson"
name="jackson-mapper-asl"
- rev="1.0.1"
+ rev="${jackson.version}"
conf="common->default"/>
<dependency org="com.thoughtworks.paranamer"
name="paranamer"
- rev="1.5"
+ rev="${paranamer.version}"
+ conf="common->default"/>
+ <dependency org="com.thoughtworks.paranamer"
+ name="paranamer-ant"
+ rev="${paranamer.version}"
conf="common->default"/>
</dependencies>
Modified: hadoop/common/trunk/ivy/libraries.properties
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/ivy/libraries.properties?rev=816727&r1=816726&r2=816727&view=diff
==============================================================================
--- hadoop/common/trunk/ivy/libraries.properties (original)
+++ hadoop/common/trunk/ivy/libraries.properties Fri Sep 18 17:47:08 2009
@@ -16,6 +16,8 @@
#These are the versions of our dependencies (in alphabetical order)
apacheant.version=1.7.0
+avro.version=1.1.0
+
checkstyle.version=4.2
commons-cli.version=1.2
@@ -42,6 +44,8 @@
#ivy.version=2.0.0-beta2
ivy.version=2.0.0-rc2
+jackson.version=1.0.1
+
jasper.version=5.5.12
jsp.version=2.1
jsp-api.version=5.5.12
@@ -61,6 +65,8 @@
oro.version=2.0.8
+paranamer.version=1.5
+
rats-lib.version=0.6
servlet.version=4.0.6
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java?rev=816727&r1=816726&r2=816727&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
Fri Sep 18 17:47:08 2009
@@ -52,7 +52,7 @@
@Override
protected Schema getSchema(Object t, Map<String, String> metadata) {
String jsonSchema = metadata.get(AVRO_SCHEMA_KEY);
- return jsonSchema != null ? Schema.parse(jsonSchema) :
GenericData.induce(t);
+ return jsonSchema != null ? Schema.parse(jsonSchema) :
GenericData.get().induce(t);
}
@Override
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java?rev=816727&r1=816726&r2=816727&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
Fri Sep 18 17:47:08 2009
@@ -85,7 +85,7 @@
|| "null".equals(clazz.getEnclosingClass().getName())) ?
clazz.getPackage().getName() + "."
: (clazz.getEnclosingClass().getName() + "$"));
- return new ReflectDatumReader(ReflectData.getSchema(clazz), prefix);
+ return new ReflectDatumReader(ReflectData.get().getSchema(clazz),
prefix);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -93,7 +93,7 @@
@Override
protected Schema getSchema(Object t, Map<String, String> metadata) {
- return ReflectData.getSchema(t.getClass());
+ return ReflectData.get().getSchema(t.getClass());
}
@Override
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java?rev=816727&r1=816726&r2=816727&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
Fri Sep 18 17:47:08 2009
@@ -50,7 +50,7 @@
try {
Class<SpecificRecord> clazz = (Class<SpecificRecord>)
getClassFromMetadata(metadata);
- return new SpecificDatumReader(clazz.newInstance().schema());
+ return new SpecificDatumReader(clazz.newInstance().getSchema());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -58,7 +58,7 @@
@Override
protected Schema getSchema(SpecificRecord t, Map<String, String> metadata) {
- return t.schema();
+ return t.getSchema();
}
@Override
Added: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpc.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpc.java?rev=816727&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpc.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpc.java Fri Sep 18
17:47:08 2009
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.*;
+import java.util.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import javax.net.SocketFactory;
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.net.NetUtils;
+
+import org.apache.avro.*;
+import org.apache.avro.ipc.*;
+import org.apache.avro.reflect.*;
+
+/** Tunnel Avro-format RPC requests over a Hadoop {...@link RPC} connection.
This
+ * does not give cross-language wire compatibility, since the Hadoop RPC wire
+ * format is non-standard, but it does permit use of Avro's protocol versioning
+ * features for inter-Java RPCs. */
+public class AvroRpc {
+ private static int VERSION = 0;
+
+ /** Tunnel an Avro RPC request and response through Hadoop's RPC. */
+ private static interface TunnelProtocol extends VersionedProtocol {
+ /** All Avro methods and responses go through this. */
+ BufferListWritable call(BufferListWritable request) throws IOException;
+ }
+
+ /** A Writable that holds a List<ByteBuffer>, The Avro RPC Transceiver's
+ * basic unit of data transfer.*/
+ private static class BufferListWritable implements Writable {
+ private List<ByteBuffer> buffers;
+
+ public BufferListWritable() {} // required for RPC Writables
+
+ public BufferListWritable(List<ByteBuffer> buffers) {
+ this.buffers = buffers;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ buffers = new ArrayList<ByteBuffer>(size);
+ for (int i = 0; i < size; i++) {
+ int length = in.readInt();
+ ByteBuffer buffer = ByteBuffer.allocate(length);
+ in.readFully(buffer.array(), 0, length);
+ buffers.add(buffer);
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(buffers.size());
+ for (ByteBuffer buffer : buffers) {
+ out.writeInt(buffer.remaining());
+ out.write(buffer.array(), buffer.position(), buffer.remaining());
+ }
+ }
+ }
+
+ /** An Avro RPC Transceiver that tunnels client requests through Hadoop
+ * RPC. */
+ private static class ClientTransceiver extends Transceiver {
+ private TunnelProtocol tunnel;
+ private InetSocketAddress remote;
+
+ public ClientTransceiver(InetSocketAddress addr,
+ UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory)
+ throws IOException {
+ this.tunnel = (TunnelProtocol)RPC.getProxy(TunnelProtocol.class, VERSION,
+ addr, ticket, conf, factory);
+ this.remote = addr;
+ }
+
+ public String getRemoteName() { return remote.toString(); }
+
+ public List<ByteBuffer> transceive(List<ByteBuffer> request)
+ throws IOException {
+ return tunnel.call(new BufferListWritable(request)).buffers;
+ }
+
+ public List<ByteBuffer> readBuffers() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws IOException {}
+ }
+
+ private static class Invoker extends ReflectRequestor {
+ public Invoker(Protocol protocol, Transceiver transceiver)
+ throws IOException {
+ super(protocol, transceiver);
+ }
+ }
+
+ /** Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address. */
+ public static Object getProxy(Class<?> protocol,
+ InetSocketAddress addr,
+ Configuration conf)
+ throws IOException {
+ UserGroupInformation ugi = null;
+ try {
+ ugi = UserGroupInformation.login(conf);
+ } catch (LoginException le) {
+ throw new RuntimeException("Couldn't login!");
+ }
+ return getProxy(protocol, addr, ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf));
+ }
+
+ /** Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address. */
+ public static Object getProxy
+ (final Class<?> protocol, final InetSocketAddress addr,
+ final UserGroupInformation ticket,
+ final Configuration conf, final SocketFactory factory)
+ throws IOException {
+
+ return Proxy.newProxyInstance
+ (protocol.getClassLoader(), new Class[] { protocol },
+ new InvocationHandler() {
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ return new Invoker
+ (ReflectData.get().getProtocol(protocol),
+ new ClientTransceiver(addr, ticket, conf, factory))
+ .invoke(proxy, method, args);
+ }
+ });
+ }
+
+ /** An Avro RPC Transceiver that provides a request passed through Hadoop RPC
+ * to the Avro RPC Responder for processing. */
+ private static class ServerTransceiver extends Transceiver {
+ List<ByteBuffer> request;
+
+ public ServerTransceiver(List<ByteBuffer> request) {
+ this.request = request;
+ }
+
+ public String getRemoteName() { return "remote"; }
+
+ public List<ByteBuffer> readBuffers() throws IOException {
+ return request;
+ }
+
+ public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws IOException {}
+ }
+
+ /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
+ private static class TunnelResponder extends ReflectResponder
+ implements TunnelProtocol {
+
+ public TunnelResponder(Class iface, Object impl) {
+ super(iface, impl);
+ }
+
+ public long getProtocolVersion(String protocol, long version)
+ throws IOException {
+ return VERSION;
+ }
+
+ public BufferListWritable call(final BufferListWritable request)
+ throws IOException {
+ return new BufferListWritable
+ (respond(new ServerTransceiver(request.buffers)));
+ }
+ }
+
+ /** Construct a server for a protocol implementation instance listening on a
+ * port and address. */
+ public static Server getServer(Object impl, String bindAddress, int port,
+ Configuration conf)
+ throws IOException {
+ return RPC.getServer(new TunnelResponder(impl.getClass(), impl),
+ bindAddress, port, conf);
+
+ }
+
+ /** Construct a server for a protocol implementation instance listening on a
+ * port and address. */
+ public static RPC.Server getServer(Object impl, String bindAddress, int port,
+ int numHandlers, boolean verbose,
+ Configuration conf)
+ throws IOException {
+ return RPC.getServer(new TunnelResponder(impl.getClass(), impl),
+ bindAddress, port, numHandlers, verbose, conf);
+ }
+
+}
Added:
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java?rev=816727&view=auto
==============================================================================
---
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java
(added)
+++
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java
Fri Sep 18 17:47:08 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+
+...@suppresswarnings("serial")
+public interface AvroTestProtocol {
+ public static class Problem extends AvroRemoteException {
+ public Problem() {}
+ }
+ void ping();
+ Utf8 echo(Utf8 value);
+ int add(int v1, int v2);
+ int error() throws Problem;
+}
Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java?rev=816727&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
(added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
Fri Sep 18 17:47:08 2009
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.net.NetUtils;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+
+/** Unit tests for AvroRpc. */
+public class TestAvroRpc extends TestCase {
+ private static final String ADDRESS = "0.0.0.0";
+
+ public static final Log LOG =
+ LogFactory.getLog(TestAvroRpc.class);
+
+ private static Configuration conf = new Configuration();
+
+ int datasize = 1024*100;
+ int numThreads = 50;
+
+ public TestAvroRpc(String name) { super(name); }
+
+ public static class TestImpl implements AvroTestProtocol {
+
+ public void ping() {}
+
+ public Utf8 echo(Utf8 value) { return value; }
+
+ public int add(int v1, int v2) { return v1 + v2; }
+
+ public int error() throws Problem {
+ throw new Problem();
+ }
+ }
+
+ public void testCalls() throws Exception {
+ Configuration conf = new Configuration();
+ Server server = AvroRpc.getServer(new TestImpl(), ADDRESS, 0, conf);
+ AvroTestProtocol proxy = null;
+ try {
+ server.start();
+
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ proxy =
+ (AvroTestProtocol)AvroRpc.getProxy(AvroTestProtocol.class, addr, conf);
+
+ proxy.ping();
+
+ Utf8 utf8Result = proxy.echo(new Utf8("hello world"));
+ assertEquals(new Utf8("hello world"), utf8Result);
+
+ int intResult = proxy.add(1, 2);
+ assertEquals(3, intResult);
+
+ boolean caught = false;
+ try {
+ proxy.error();
+ } catch (AvroRemoteException e) {
+ LOG.debug("Caught " + e);
+ caught = true;
+ }
+ assertTrue(caught);
+
+ } finally {
+ server.stop();
+ }
+ }
+}