Author: sgargan
Date: Tue May 3 21:15:52 2011
New Revision: 1099257
URL: http://svn.apache.org/viewvc?rev=1099257&view=rev
Log:
Avro-815. Netty Transceiver fails processing one-way messages
Implemented writeBuffers call in the NettyTranceiver to send a NettyDataPack
similar to the request/response call. Added one-way
message definition to the test mail.avpr protocol file and updated unit test to
verify the one-way behavior.
Modified:
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
avro/trunk/share/test/schemas/mail.avpr
Modified:
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
(original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
Tue May 3 21:15:52 2011
@@ -133,8 +133,11 @@ public class NettyServer implements Serv
NettyDataPack dataPack = (NettyDataPack) e.getMessage();
List<ByteBuffer> req = dataPack.getDatas();
List<ByteBuffer> res = responder.respond(req, connectionMetadata);
- dataPack.setDatas(res);
- e.getChannel().write(dataPack);
+ // response will be null for oneway messages.
+ if(res != null) {
+ dataPack.setDatas(res);
+ e.getChannel().write(dataPack);
+ }
} catch (IOException ex) {
LOG.warn("unexpect error");
} finally {
Modified:
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
---
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
(original)
+++
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
Tue May 3 21:15:52 2011
@@ -144,7 +144,7 @@ public class NettyTransceiver extends Tr
@Override
public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
- throw new UnsupportedOperationException();
+ channel.write(new NettyDataPack(serialGenerator.incrementAndGet(),
buffers));
}
@Override
Modified:
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
---
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
(original)
+++
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
Tue May 3 21:15:52 2011
@@ -18,7 +18,11 @@
package org.apache.avro.ipc;
+import static org.junit.Assert.assertEquals;
+
import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -27,52 +31,109 @@ import org.apache.avro.ipc.specific.Spec
import org.apache.avro.test.Mail;
import org.apache.avro.test.Message;
import org.apache.avro.util.Utf8;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestNettyServer {
+ private static Server server;
+ private static Transceiver transceiver;
+ private static Mail proxy;
+ private static MailImpl mailService;
+
public static class MailImpl implements Mail {
+
+ private CountDownLatch allMessages = new CountDownLatch(5);
+
// in this simple example just return details of the message
public CharSequence send(Message message) {
return new Utf8("Sent message to [" + message.to.toString() + "] from ["
+ message.from.toString() + "] with body [" + message.body.toString()
+ "]");
}
- }
+
+ public void fireandforget(Message message) {
+ allMessages.countDown();
+ }
+
+ private void awaitMessages() throws InterruptedException {
+ allMessages.await(2, TimeUnit.SECONDS);
+ }
+
+ private void assertAllMessagesReceived() {
+ assertEquals(0, allMessages.getCount());
+ }
- @Test
- public void test() throws Exception {
+ public void reset() {
+ allMessages = new CountDownLatch(5);
+ }
+ }
+
+ @BeforeClass
+ public static void initializeConnections()throws Exception {
// start server
System.out.println("starting server...");
- Responder responder = new SpecificResponder(Mail.class, new MailImpl());
- Server server = new NettyServer(responder, new InetSocketAddress(0));
+ mailService = new MailImpl();
+ Responder responder = new SpecificResponder(Mail.class, mailService);
+ server = new NettyServer(responder, new InetSocketAddress(0));
server.start();
-
+
int serverPort = server.getPort();
System.out.println("server port : " + serverPort);
- // client
- Transceiver transceiver = new NettyTransceiver(new InetSocketAddress(
+ transceiver = new NettyTransceiver(new InetSocketAddress(
serverPort));
- Mail proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+ proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+ }
+
+ @AfterClass
+ public static void tearDownConnections() throws Exception{
+ transceiver.close();
+ server.close();
+ }
+ @Test
+ public void testRequestResponse() throws Exception {
+ for(int x = 0; x < 5; x++) {
+ verifyResponse(proxy.send(createMessage()));
+ }
+ }
+
+ private void verifyResponse(CharSequence result) {
+ Assert.assertEquals(
+ "Sent message to [wife] from [husband] with body [I love you!]",
+ result.toString());
+ }
+
+ @Test
+ public void testOneway() throws Exception {
+ for (int x = 0; x < 5; x++) {
+ proxy.fireandforget(createMessage());
+ }
+ mailService.awaitMessages();
+ mailService.assertAllMessagesReceived();
+ }
+
+ @Test
+ public void testMixtureOfRequests() throws Exception {
+ mailService.reset();
+ for (int x = 0; x < 5; x++) {
+ Message createMessage = createMessage();
+ proxy.fireandforget(createMessage);
+ verifyResponse(proxy.send(createMessage));
+ }
+ mailService.awaitMessages();
+ mailService.assertAllMessagesReceived();
+
+ }
+
+ private Message createMessage() {
Message msg = new Message();
msg.to = new Utf8("wife");
msg.from = new Utf8("husband");
msg.body = new Utf8("I love you!");
-
- try {
- for(int x = 0; x < 5; x++) {
- CharSequence result = proxy.send(msg);
- System.out.println("Result: " + result);
- Assert.assertEquals(
- "Sent message to [wife] from [husband] with body [I love you!]",
- result.toString());
- }
- } finally {
- transceiver.close();
- server.close();
- }
+ return msg;
}
}
Modified:
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
---
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
(original)
+++
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
Tue May 3 21:15:52 2011
@@ -94,5 +94,7 @@ public class TestRpcPluginOrdering {
public CharSequence send(Message message) throws AvroRemoteException {
return new Utf8("Received");
}
+ public void fireandforget(Message message) {
+ }
}
}
Modified: avro/trunk/share/test/schemas/mail.avpr
URL:
http://svn.apache.org/viewvc/avro/trunk/share/test/schemas/mail.avpr?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
--- avro/trunk/share/test/schemas/mail.avpr (original)
+++ avro/trunk/share/test/schemas/mail.avpr Tue May 3 21:15:52 2011
@@ -15,6 +15,12 @@
"send": {
"request": [{"name": "message", "type": "Message"}],
"response": "string"
+ },
+ "fireandforget": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "null",
+ "one-way": true
}
+
}
-}
\ No newline at end of file
+}