Author: cutting
Date: Thu Jul 1 21:25:01 2010
New Revision: 959787
URL: http://svn.apache.org/viewvc?rev=959787&view=rev
Log:
AVRO-405. Java: Add Netty-based RPC transceiver and server implementation.
Contributed by Harry Wang.
Added:
avro/trunk/lang/java/ivysettings-jboss-repos.xml
avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java
avro/trunk/share/test/schemas/mail.avpr
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/build.xml
avro/trunk/lang/java/ivy.xml
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=959787&r1=959786&r2=959787&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Jul 1 21:25:01 2010
@@ -25,6 +25,9 @@ Avro 1.4.0 (unreleased)
AVRO-578. Java: add payload data to RPC context for use by
plugins. (Patrick Wendell via cutting)
+ AVRO-405: Java: Add Netty-based RPC transceiver and server
+ implementation. (Harry Wang via cutting)
+
IMPROVEMENTS
AVRO-584. Update Histogram for Stats Plugin
(Patrick Wendell via philz)
Modified: avro/trunk/lang/java/build.xml
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/build.xml?rev=959787&r1=959786&r2=959787&view=diff
==============================================================================
--- avro/trunk/lang/java/build.xml (original)
+++ avro/trunk/lang/java/build.xml Thu Jul 1 21:25:01 2010
@@ -184,27 +184,38 @@
<!-- ensure that ivy taskdef is only run once, otw ant will error -->
<property name="ivy.initialized" value="true"/>
</target>
+
+ <target name="ivy-retrieve-netty">
+ <ivy:settings id="ivy.jboss.settings"
+ file="${basedir}/ivysettings-jboss-repos.xml" />
+ <ivy:retrieve type="jar" conf="default"
+ settingsRef="ivy.jboss.settings"
+ pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
+ <ivy:retrieve type="bundle" conf="default"
+ settingsRef="ivy.jboss.settings"
+ pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
+ </target>
- <target name="ivy-retrieve" depends="init,ivy-init">
+ <target name="ivy-retrieve" depends="init,ivy-init,ivy-retrieve-netty">
<ivy:retrieve type="jar" conf="default"
pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
</target>
- <target name="ivy-retrieve-tools" depends="init,ivy-init">
+ <target name="ivy-retrieve-tools" depends="init,ivy-init,ivy-retrieve-netty">
<!-- Place in separate directory, since these artificats will
be packaged in the tools jar. -->
<ivy:retrieve type="jar" conf="tools"
pattern="${ivy.lib}/tools/[artifact]-[revision].[ext]"/>
</target>
- <target name="ivy-retrieve-test" depends="init,ivy-init">
+ <target name="ivy-retrieve-test" depends="init,ivy-init,ivy-retrieve-netty">
<ivy:retrieve type="jar" conf="test"
pattern="${ivy.test.lib}/[artifact]-[revision].[ext]"/>
</target>
- <target name="ivy-retrieve-build" depends="init,ivy-init">
+ <target name="ivy-retrieve-build" depends="init,ivy-init,ivy-retrieve-netty">
<ivy:retrieve type="jar" conf="build"
- pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
+ pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
</target>
<macrodef name="java-compiler">
Modified: avro/trunk/lang/java/ivy.xml
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ivy.xml?rev=959787&r1=959786&r2=959787&view=diff
==============================================================================
--- avro/trunk/lang/java/ivy.xml (original)
+++ avro/trunk/lang/java/ivy.xml Thu Jul 1 21:25:01 2010
@@ -65,6 +65,7 @@
conf="build->default;test->default;tools->default"
transitive="false"/>
<dependency org="commons-httpclient" name="commons-httpclient" rev="3.0.1"
conf="test->default;tools->default"/>
+ <dependency org="org.jboss.netty" name="netty" rev="3.2.1.Final"/>
</dependencies>
</ivy-module>
Added: avro/trunk/lang/java/ivysettings-jboss-repos.xml
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ivysettings-jboss-repos.xml?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/ivysettings-jboss-repos.xml (added)
+++ avro/trunk/lang/java/ivysettings-jboss-repos.xml Thu Jul 1 21:25:01 2010
@@ -0,0 +1,38 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<ivysettings>
+ <property name="repo.maven.org"
+ value="http://repo2.maven.org/maven2/" override="false"/>
+ <property name="repo.jboss.org"
+ value="http://repository.jboss.org/nexus/content/groups/public/"
override="false"/>
+ <property name="maven2.pattern"
+ value="[organisation]/[module]/[revision]/[module]-[revision]"/>
+ <property name="maven2.pattern.ext" value="${maven2.pattern}.[ext]"/>
+ <settings defaultResolver="default"/>
+ <resolvers>
+ <ibiblio name="maven2" root="${repo.maven.org}"
+ pattern="${maven2.pattern.ext}" m2compatible="true"/>
+ <ibiblio name="jboss-maven2" root="${repo.jboss.org}"
+ pattern="${maven2.pattern.ext}" m2compatible="true"/>
+ <chain name="default" dual="true">
+ <resolver ref="maven2"/>
+ <resolver ref="jboss-maven2"/>
+ </chain>
+ </resolvers>
+</ivysettings>
\ No newline at end of file
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java Thu Jul
1 21:25:01 2010
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import org.apache.avro.ipc.NettyTransportCodec.NettyDataPack;
+import org.apache.avro.ipc.NettyTransportCodec.NettyFrameDecoder;
+import org.apache.avro.ipc.NettyTransportCodec.NettyFrameEncoder;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Netty-based RPC {...@link Server} implementation.
+ */
+public class NettyServer extends Thread implements Server {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class
+ .getName());
+
+ private Responder responder;
+ private InetSocketAddress addr;
+
+ private Channel serverChannel;
+ private ChannelGroup allChannels = new DefaultChannelGroup(
+ "avro-netty-server");
+ private ChannelFactory channelFactory;
+
+ public NettyServer(Responder responder, InetSocketAddress addr) {
+ this.responder = responder;
+ this.addr = addr;
+
+ setName("AvroNettyServer on " + addr);
+ setDaemon(true);
+ start();
+ }
+
+ @Override
+ public void close() {
+ ChannelGroupFuture future = allChannels.close();
+ future.awaitUninterruptibly();
+ channelFactory.releaseExternalResources();
+ }
+
+ @Override
+ public int getPort() {
+ return ((InetSocketAddress) serverChannel.getLocalAddress()).getPort();
+ }
+
+ @Override
+ public void run() {
+ channelFactory = new NioServerSocketChannelFactory(Executors
+ .newCachedThreadPool(), Executors.newCachedThreadPool());
+ ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("frameDecoder", new NettyFrameDecoder());
+ p.addLast("frameEncoder", new NettyFrameEncoder());
+ p.addLast("handler", new NettyServerAvroHandler());
+ return p;
+ }
+ });
+ serverChannel = bootstrap.bind(addr);
+ allChannels.add(serverChannel);
+ }
+
+ /**
+ * Avro server handler for the Netty transport
+ */
+ class NettyServerAvroHandler extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
+ throws Exception {
+ if (e instanceof ChannelStateEvent) {
+ LOG.info(e.toString());
+ }
+ super.handleUpstream(ctx, e);
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ allChannels.add(e.getChannel());
+ super.channelOpen(ctx, e);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ try {
+ NettyDataPack dataPack = (NettyDataPack) e.getMessage();
+ List<ByteBuffer> req = dataPack.getDatas();
+ List<ByteBuffer> res = responder.respond(req);
+ dataPack.setDatas(res);
+ e.getChannel().write(dataPack);
+ } catch (IOException ex) {
+ LOG.warn("unexpect error");
+ } finally {
+ e.getChannel().close();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Unexpected exception from downstream.", e.getCause());
+ e.getChannel().close();
+ }
+
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java
(added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java Thu
Jul 1 21:25:01 2010
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.NettyTransportCodec.NettyDataPack;
+import org.apache.avro.ipc.NettyTransportCodec.NettyFrameDecoder;
+import org.apache.avro.ipc.NettyTransportCodec.NettyFrameEncoder;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Netty-based {...@link Transceiver} implementation.
+ */
+public class NettyTransceiver extends Transceiver {
+ private static final Logger LOG =
LoggerFactory.getLogger(NettyTransceiver.class
+ .getName());
+
+ private ChannelFactory channelFactory;
+ private Channel channel;
+
+ private AtomicInteger serialGenerator = new AtomicInteger(0);
+ private Map<Integer, CallFuture> requests =
+ new ConcurrentHashMap<Integer, CallFuture>();
+
+ private Protocol remote;
+
+ public NettyTransceiver(InetSocketAddress addr) {
+ // Set up.
+ channelFactory = new NioClientSocketChannelFactory(Executors
+ .newCachedThreadPool(), Executors.newCachedThreadPool());
+ ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+
+ // Configure the event pipeline factory.
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("frameDecoder", new NettyFrameDecoder());
+ p.addLast("frameEncoder", new NettyFrameEncoder());
+ p.addLast("handler", new NettyClientAvroHandler());
+ return p;
+ }
+ });
+
+ bootstrap.setOption("tcpNoDelay", true);
+
+ // Make a new connection.
+ ChannelFuture channelFuture = bootstrap.connect(addr);
+ channelFuture.awaitUninterruptibly();
+ if (!channelFuture.isSuccess()) {
+ channelFuture.getCause().printStackTrace();
+ throw new RuntimeException(channelFuture.getCause());
+ }
+ channel = channelFuture.getChannel();
+ }
+
+ public void close() {
+ // Close the connection.
+ channel.close().awaitUninterruptibly();
+ // Shut down all thread pools to exit.
+ channelFactory.releaseExternalResources();
+ }
+
+ @Override
+ public String getRemoteName() {
+ return channel.getRemoteAddress().toString();
+ }
+
+ /**
+ * Override as non-synchronized method because the method is thread safe.
+ */
+ @Override
+ public List<ByteBuffer> transceive(List<ByteBuffer> request)
+ throws IOException {
+ int serial = serialGenerator.incrementAndGet();
+ NettyDataPack dataPack = new NettyDataPack(serial, request);
+ CallFuture callFuture = new CallFuture();
+ requests.put(serial, callFuture);
+ channel.write(dataPack);
+ try {
+ return callFuture.get();
+ } catch (InterruptedException e) {
+ LOG.warn("failed to get the response", e);
+ return null;
+ } catch (ExecutionException e) {
+ LOG.warn("failed to get the response", e);
+ return null;
+ } finally {
+ requests.remove(serial);
+ }
+ }
+
+ @Override
+ public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<ByteBuffer> readBuffers() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Protocol getRemote() {
+ return remote;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return remote!=null;
+ }
+
+ @Override
+ public void setRemote(Protocol protocol) {
+ this.remote = protocol;
+ }
+
+ /**
+ * Future class for a RPC call
+ */
+ class CallFuture implements Future<List<ByteBuffer>>{
+ private Semaphore sem = new Semaphore(0);
+ private List<ByteBuffer> response = null;
+
+ public void setResponse(List<ByteBuffer> response) {
+ this.response = response;
+ sem.release();
+ }
+
+ public void releaseSemphore() {
+ sem.release();
+ }
+
+ public List<ByteBuffer> getResponse() {
+ return response;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public List<ByteBuffer> get() throws InterruptedException,
+ ExecutionException {
+ sem.acquire();
+ return response;
+ }
+
+ @Override
+ public List<ByteBuffer> get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if (sem.tryAcquire(timeout, unit)) {
+ return response;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ return sem.availablePermits()>0;
+ }
+
+ }
+
+ /**
+ * Avro client handler for the Netty transport
+ */
+ class NettyClientAvroHandler extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
+ throws Exception {
+ if (e instanceof ChannelStateEvent) {
+ LOG.info(e.toString());
+ }
+ super.handleUpstream(ctx, e);
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ // channel = e.getChannel();
+ super.channelOpen(ctx, e);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, final MessageEvent
e) {
+ NettyDataPack dataPack = (NettyDataPack)e.getMessage();
+ CallFuture callFuture = requests.get(dataPack.getSerial());
+ if (callFuture==null) {
+ throw new RuntimeException("Missing previous call info");
+ }
+ callFuture.setResponse(dataPack.getDatas());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Unexpected exception from downstream.", e.getCause());
+ e.getChannel().close();
+ // let the blocking waiting exit
+ Iterator<CallFuture> it = requests.values().iterator();
+ while(it.hasNext()) {
+ it.next().releaseSemphore();
+ it.remove();
+ }
+
+ }
+
+ }
+
+}
Added:
avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java
(added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java
Thu Jul 1 21:25:01 2010
@@ -0,0 +1,165 @@
+package org.apache.avro.ipc;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+/**
+ * Data structure, encoder and decoder classes for the Netty transport.
+ */
+public class NettyTransportCodec {
+ /**
+ * Transport protocol data structure when using Netty.
+ */
+ public static class NettyDataPack {
+ private int serial; // to track each call in client side
+ private List<ByteBuffer> datas;
+
+ public NettyDataPack() {}
+
+ public NettyDataPack(int serial, List<ByteBuffer> datas) {
+ this.serial = serial;
+ this.datas = datas;
+ }
+
+ public void setSerial(int serial) {
+ this.serial = serial;
+ }
+
+ public int getSerial() {
+ return serial;
+ }
+
+ public void setDatas(List<ByteBuffer> datas) {
+ this.datas = datas;
+ }
+
+ public List<ByteBuffer> getDatas() {
+ return datas;
+ }
+
+ }
+
+ /**
+ * Protocol encoder which converts NettyDataPack which contains the
+ * Responder's output List<ByteBuffer> to ChannelBuffer needed
+ * by Netty.
+ */
+ public static class NettyFrameEncoder extends OneToOneEncoder {
+
+ /**
+ * encode msg to ChannelBuffer
+ * @param msg NettyDataPack from
+ * NettyServerAvroHandler/NettyClientAvroHandler in the pipeline
+ * @return encoded ChannelBuffer
+ */
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object
msg)
+ throws Exception {
+ NettyDataPack dataPack = (NettyDataPack)msg;
+ List<ByteBuffer> origs = dataPack.getDatas();
+ List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(origs.size() * 2 + 1);
+ bbs.add(getPackHeader(dataPack)); // prepend a pack header including
serial number and list size
+ for (ByteBuffer b : origs) {
+ bbs.add(getLengthHeader(b)); // for each buffer prepend length field
+ bbs.add(b);
+ }
+
+ return ChannelBuffers
+ .wrappedBuffer(bbs.toArray(new ByteBuffer[bbs.size()]));
+ }
+
+ private ByteBuffer getPackHeader(NettyDataPack dataPack) {
+ ByteBuffer header = ByteBuffer.allocate(8);
+ header.putInt(dataPack.getSerial());
+ header.putInt(dataPack.getDatas().size());
+ header.flip();
+ return header;
+ }
+
+ private ByteBuffer getLengthHeader(ByteBuffer buf) {
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.putInt(buf.limit());
+ header.flip();
+ return header;
+ }
+ }
+
+ /**
+ * Protocol decoder which converts Netty's ChannelBuffer to
+ * NettyDataPack which contains a List<ByteBuffer> needed
+ * by Avro Responder.
+ */
+ public static class NettyFrameDecoder extends FrameDecoder {
+ private boolean packHeaderRead = false;
+ private int listSize;
+ private NettyDataPack dataPack;
+
+ /**
+ * decode buffer to NettyDataPack
+ */
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buffer) throws Exception {
+
+ if (!packHeaderRead) {
+ if (decodePackHeader(ctx, channel, buffer)) {
+ packHeaderRead = true;
+ }
+ return null;
+ } else {
+ if (decodePackBody(ctx, channel, buffer)) {
+ packHeaderRead = false; // reset state
+ return dataPack;
+ } else {
+ return null;
+ }
+ }
+
+ }
+
+ private boolean decodePackHeader(ChannelHandlerContext ctx, Channel
channel,
+ ChannelBuffer buffer) throws Exception {
+ if (buffer.readableBytes()<8) {
+ return false;
+ }
+
+ int serial = buffer.readInt();
+ listSize = buffer.readInt();
+ dataPack = new NettyDataPack(serial, new
ArrayList<ByteBuffer>(listSize));
+ return true;
+ }
+
+ private boolean decodePackBody(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buffer) throws Exception {
+ if (buffer.readableBytes() < 4) {
+ return false;
+ }
+
+ buffer.markReaderIndex();
+
+ int length = buffer.readInt();
+
+ if (buffer.readableBytes() < length) {
+ buffer.resetReaderIndex();
+ return false;
+ }
+
+ ByteBuffer bb = ByteBuffer.allocate(length);
+ buffer.readBytes(bb);
+ bb.flip();
+ dataPack.getDatas().add(bb);
+
+ return dataPack.getDatas().size()==listSize;
+ }
+
+ }
+
+}
Added:
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java
(added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java
Thu Jul 1 21:25:01 2010
@@ -0,0 +1,58 @@
+package org.apache.avro.ipc;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.test.Mail;
+import org.apache.avro.test.Message;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+public class TestNettyServer {
+
+ public static class MailImpl implements Mail {
+ // in this simple example just return details of the message
+ public Utf8 send(Message message) {
+ return new Utf8("Sent message to [" + message.to.toString() + "] from ["
+ + message.from.toString() + "] with body [" + message.body.toString()
+ + "]");
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ // start server
+ System.out.println("starting server...");
+ Responder responder = new SpecificResponder(Mail.class, new MailImpl());
+ Server server = new NettyServer(responder, new InetSocketAddress(0));
+ Thread.sleep(1000); // waiting for server startup
+
+ int serverPort = server.getPort();
+ System.out.println("server port : " + serverPort);
+
+ // client
+ Transceiver transceiver = new NettyTransceiver(new InetSocketAddress(
+ serverPort));
+ Mail proxy = (Mail) SpecificRequestor.getClient(Mail.class, transceiver);
+
+ Message msg = new Message();
+ msg.to = new Utf8("wife");
+ msg.from = new Utf8("husband");
+ msg.body = new Utf8("I love you!");
+
+ try {
+ Utf8 result = proxy.send(msg);
+ System.out.println("Result: " + result);
+ Assert.assertEquals(
+ "Sent message to [wife] from [husband] with body [I love you!]",
+ result.toString());
+ } finally {
+ transceiver.close();
+ server.close();
+ }
+ }
+
+}
Added: avro/trunk/share/test/schemas/mail.avpr
URL:
http://svn.apache.org/viewvc/avro/trunk/share/test/schemas/mail.avpr?rev=959787&view=auto
==============================================================================
--- avro/trunk/share/test/schemas/mail.avpr (added)
+++ avro/trunk/share/test/schemas/mail.avpr Thu Jul 1 21:25:01 2010
@@ -0,0 +1,20 @@
+{"namespace": "org.apache.avro.test",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ }
+ }
+}
\ No newline at end of file