gdamour 2004/04/19 09:29:31
Modified: modules/network/src/java/org/apache/geronimo/network/protocol
PacketInputStream.java
modules/network/src/test/org/apache/geronimo/network/protocol
PacketStreamTest.java
Log:
o Provide a call-back mechanism when a UpPacket is pushed from
the adapted Protocol; This way, a client does not have to poll the
InputStream.
o Improve available and read in order to take into account the
internal buffer of UpPackets.
Revision Changes Path
1.3 +53 -7
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/PacketInputStream.java
Index: PacketInputStream.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/PacketInputStream.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PacketInputStream.java 20 Mar 2004 20:39:11 -0000 1.2
+++ PacketInputStream.java 19 Apr 2004 16:29:31 -0000 1.3
@@ -28,8 +28,17 @@
*/
public class PacketInputStream extends InputStream {
- ProtocolBuffer buffer;
+ /**
+ * Null AvailableCallBack.
+ */
+ private static final AvailableCallBack NULL_CALLBACK =
+ new AvailableCallBack() {
+ public void execute() {}
+ };
+
+ private final ProtocolBuffer buffer;
private final Protocol up;
+ private final AvailableCallBack callBack;
private ByteBuffer currentBuffer;
private boolean closed;
@@ -39,14 +48,31 @@
}
public PacketInputStream(Protocol up, short queueSize) {
+ this(up, queueSize, null);
+ }
+
+ /**
+ * Creates an InputStream on top of the provided protocol.
+ *
+ * @param up Protocol.
+ * @param queueSize Size of the queue used to buffer UpPackets coming
from
+ * up.
+ * @param aCallBack Callback when an UpPacket is received from up.
+ */
+ public PacketInputStream(Protocol up, short queueSize,
+ AvailableCallBack aCallBack) {
this.buffer = new ProtocolBuffer(queueSize);
+ if ( null == aCallBack ) {
+ this.callBack = NULL_CALLBACK;
+ } else {
+ this.callBack = aCallBack;
+ }
this.up = up;
this.currentBuffer = ByteBuffer.allocate(0);
this.closed = false;
this.up.setUpProtocol(buffer);
buffer.setDownProtocol(this.up);
-
}
public int read() throws IOException {
@@ -70,7 +96,7 @@
}
int length = len;
- while (length > 0) {
+ while (length > 0 && 0 < available() ) {
check();
int remaining = currentBuffer.remaining();
int segment = Math.min(remaining, length);
@@ -78,7 +104,7 @@
off += segment;
length -= segment;
}
- return len;
+ return len - length;
}
public long skip(long n) throws IOException {
@@ -100,7 +126,7 @@
}
public int available() throws IOException {
- return currentBuffer.remaining();
+ return currentBuffer.remaining() + buffer.available();
}
public void close() throws IOException {
@@ -132,13 +158,20 @@
BoundedLinkedQueue queue;
Protocol down;
+ volatile int available;
ProtocolBuffer(short size) {
queue = new BoundedLinkedQueue(size);
}
+ int available() {
+ return available;
+ }
+
UpPacket getPacket() throws InterruptedException {
- return (UpPacket) queue.take();
+ UpPacket packet = (UpPacket) queue.take();
+ available -= packet.getBuffer().remaining();
+ return packet;
}
public Protocol getUpProtocol() {
@@ -176,10 +209,12 @@
public void sendUp(UpPacket packet) throws ProtocolException {
try {
+ available += packet.getBuffer().remaining();
queue.put(packet);
} catch (InterruptedException e) {
throw new ProtocolException(e);
}
+ callBack.execute();
}
public void sendDown(DownPacket packet) throws ProtocolException {
@@ -187,4 +222,15 @@
}
}
+
+ /**
+ * When an UpPacket has been received by the protocol from which this
+ * instance is reading, the execute method is called.
+ * <BR>
+ * It allows reading from the InputStream without having to poll it.
+ */
+ public interface AvailableCallBack {
+ public void execute() throws ProtocolException;
+ }
+
}
1.4 +44 -2
incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol/PacketStreamTest.java
Index: PacketStreamTest.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol/PacketStreamTest.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- PacketStreamTest.java 21 Mar 2004 14:27:09 -0000 1.3
+++ PacketStreamTest.java 19 Apr 2004 16:29:31 -0000 1.4
@@ -16,10 +16,15 @@
*/
package org.apache.geronimo.network.protocol;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import
org.apache.geronimo.network.protocol.PacketInputStream.AvailableCallBack;
+
import EDU.oswego.cs.dl.util.concurrent.Latch;
import junit.framework.TestCase;
@@ -63,6 +68,25 @@
assertFalse("Writer thread failed", failed);
}
+ public void testCallBack() throws Exception {
+ Thread thread = new Thread(new WriterThread((short) 2), "Test
Writer");
+
+ startLatch.release();
+
+ DummyCallBack callBack = new DummyCallBack();
+ PacketInputStream in = new PacketInputStream(eup, (short) 50,
callBack);
+ callBack.setInputStream(in);
+ thread.start();
+ thread.join();
+
+ InputStream memIn = new
ByteArrayInputStream(callBack.memOut.toByteArray());
+ ObjectInputStream objIn = new ObjectInputStream(memIn);
+ String msg = (String) objIn.readObject();
+
+ assertEquals(msg, "Hello World!");
+ assertFalse("Writer thread failed", failed);
+ }
+
class WriterThread implements Runnable {
short packetSize;
@@ -87,10 +111,28 @@
}
}
+ private class DummyCallBack implements AvailableCallBack {
+ private InputStream in;
+ private ByteArrayOutputStream memOut = new ByteArrayOutputStream();
+ private void setInputStream(InputStream anIn) {
+ in = anIn;
+ }
+ public void execute() {
+ try {
+ int size = in.available();
+ byte[] buffer = new byte[size];
+ in.read(buffer);
+ memOut.write(buffer);
+ } catch (IOException e) {
+ ;
+ }
+ }
+ }
+
public void setUp() throws Exception {
eup = new EchoUpProtocol();
startLatch = new Latch();
failed = false;
}
-
+
}