adc 2004/03/17 20:05:27
Modified:
modules/network/src/java/org/apache/geronimo/network/protocol/util
PacketUtil.java
modules/network/src/test/org/apache/geronimo/network/protocol
SocketProtocolStressTest.java
Added: modules/network/src/java/org/apache/geronimo/network/protocol
EchoDownProtocol.java EchoUpProtocol.java
PacketInputStream.java PacketOutputStream.java
Log:
Added packet input/output streams.
Revision Changes Path
1.1
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/EchoDownProtocol.java
Index: EchoDownProtocol.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.network.protocol;
import java.util.Collections;
/**
* @version $Revision: 1.1 $ $Date: 2004/03/18 04:05:27 $
*/
public class EchoDownProtocol extends AbstractProtocol {
public void setup() throws ProtocolException {
}
public void drain() throws ProtocolException {
}
public void teardown() throws ProtocolException {
}
public void sendUp(UpPacket packet) throws ProtocolException {
PlainDownPacket dnPacket = new PlainDownPacket();
dnPacket.setBuffers(Collections.singleton(packet.getBuffer()));
getDownProtocol().sendDown(dnPacket);
}
public void sendDown(DownPacket packet) throws ProtocolException {
getDownProtocol().sendDown(packet);
}
}
1.1
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/EchoUpProtocol.java
Index: EchoUpProtocol.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.network.protocol;
import org.apache.geronimo.network.protocol.util.PacketUtil;
/**
* @version $Revision: 1.1 $ $Date: 2004/03/18 04:05:27 $
*/
public class EchoUpProtocol extends AbstractProtocol {
public void setup() throws ProtocolException {
}
public void drain() throws ProtocolException {
}
public void teardown() throws ProtocolException {
}
public void sendUp(UpPacket packet) throws ProtocolException {
getUpProtocol().sendUp(packet);
}
public void sendDown(DownPacket packet) throws ProtocolException {
UpPacket upPacket = new UpPacket();
upPacket.setBuffer(PacketUtil.consolidate(packet.getBuffers()));
getUpProtocol().sendUp(upPacket);
}
}
1.1
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/PacketInputStream.java
Index: PacketInputStream.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.network.protocol;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
/**
* @version $Revision: 1.1 $ $Date: 2004/03/18 04:05:27 $
*/
public class PacketInputStream extends InputStream {
ProtocolBuffer buffer;
private final Protocol up;
private ByteBuffer currentBuffer;
private boolean closed;
public PacketInputStream(Protocol up) {
this(up, (short) 1);
}
public PacketInputStream(Protocol up, short queueSize) {
this.buffer = new ProtocolBuffer(queueSize);
this.up = up;
this.currentBuffer = ByteBuffer.allocate(0);
this.closed = false;
this.up.setUpProtocol(buffer);
buffer.setDownProtocol(up);
}
public int read() throws IOException {
if (closed) throw new IOException("Packet InputStream closed");
check();
return currentBuffer.get();
}
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
public int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int length = len;
while (length > 0) {
check();
int remaining = currentBuffer.remaining();
int segment = Math.min(remaining, length);
currentBuffer.get(b, off, segment);
off += segment;
length -= segment;
}
return len;
}
public long skip(long n) throws IOException {
long length = n;
while (length > 0) {
int segment;
if (length <= Integer.MAX_VALUE) {
segment = Math.min(currentBuffer.remaining(), (int) length);
} else {
segment = Math.min(currentBuffer.remaining(),
Integer.MAX_VALUE);
}
currentBuffer.position(currentBuffer.position() + segment);
length -= segment;
check();
}
return n;
}
public int available() throws IOException {
return currentBuffer.remaining();
}
public void close() throws IOException {
closed = true;
}
public synchronized void mark(int readlimit) {
}
public synchronized void reset() throws IOException {
throw new IOException("mark/reset not supported");
}
public boolean markSupported() {
return false;
}
private void check() throws IOException {
if (!currentBuffer.hasRemaining()) {
try {
currentBuffer = buffer.getPacket().getBuffer();
} catch (InterruptedException e) {
throw (IOException) new IOException().initCause(e);
}
}
}
private class ProtocolBuffer implements Protocol {
BoundedLinkedQueue queue;
Protocol down;
ProtocolBuffer(short size) {
queue = new BoundedLinkedQueue(size);
}
UpPacket getPacket() throws InterruptedException {
return (UpPacket) queue.take();
}
public Protocol getUpProtocol() {
throw new NoSuchMethodError("Socket protocol is at the bottom");
}
public void setUpProtocol(Protocol up) {
throw new NoSuchMethodError("Socket protocol is at the bottom");
}
public Protocol getDownProtocol() {
return down;
}
public void setDownProtocol(Protocol down) {
this.down = down;
}
public void clearLinks() {
down = null;
}
public Protocol cloneProtocol() throws CloneNotSupportedException {
return (Protocol) super.clone();
}
public void setup() throws ProtocolException {
}
public void drain() throws ProtocolException {
}
public void teardown() throws ProtocolException {
}
public void sendUp(UpPacket packet) throws ProtocolException {
try {
queue.put(packet);
} catch (InterruptedException e) {
throw new ProtocolException(e);
}
}
public void sendDown(DownPacket packet) throws ProtocolException {
throw new UnsupportedOperationException("Method not implemented");
}
}
}
1.1
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/PacketOutputStream.java
Index: PacketOutputStream.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.network.protocol;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
/**
* @version $Revision: 1.1 $ $Date: 2004/03/18 04:05:27 $
*/
public class PacketOutputStream extends OutputStream {
private final Protocol down;
private ByteBuffer currentBuffer;
private short packetSize;
private boolean closed;
public PacketOutputStream(Protocol down) {
this(down, (short) 1024);
}
public PacketOutputStream(Protocol down, short packetSize) {
this.down = down;
this.packetSize = packetSize;
this.currentBuffer = ByteBuffer.allocate(packetSize);
this.closed = false;
}
public short getPacketSize() {
return packetSize;
}
public void write(int b) throws IOException {
if (closed) throw new IOException("PacketOutputStream closed");
currentBuffer.put((byte) b);
if (!currentBuffer.hasRemaining()) flush();
}
public void write(byte b[]) throws IOException {
if (closed) throw new IOException("PacketOutputStream closed");
write(b, 0, b.length);
}
public void write(byte b[], int off, int len) throws IOException {
if (closed) throw new IOException("PacketOutputStream closed");
if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
if (currentBuffer.remaining() <= len) {
while (len > 0) {
int remaining = currentBuffer.remaining();
int segment = Math.min(remaining, len);
currentBuffer.put(b, off, segment);
off += segment;
len -= remaining;
if (!currentBuffer.hasRemaining()) flush();
}
} else {
currentBuffer.put(b, off, len);
}
}
public void flush() throws IOException {
if (closed) throw new IOException("PacketOutputStream closed");
currentBuffer.flip();
if (currentBuffer.remaining() > 0) {
PlainDownPacket packet = new PlainDownPacket();
packet.setBuffers(Collections.singleton(currentBuffer));
try {
down.sendDown(packet);
} catch (ProtocolException e) {
throw (IOException) new IOException().initCause(e);
}
}
currentBuffer = ByteBuffer.allocate(packetSize);
}
public void close() throws IOException {
flush();
closed = true;
}
}
1.3 +13 -1
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/util/PacketUtil.java
Index: PacketUtil.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/util/PacketUtil.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PacketUtil.java 10 Mar 2004 09:59:15 -0000 1.2
+++ PacketUtil.java 18 Mar 2004 04:05:27 -0000 1.3
@@ -44,6 +44,18 @@
return remaining;
}
+ public static ByteBuffer consolidate(Collection packets) {
+ int size = 0;
+ for (Iterator iter = packets.iterator(); iter.hasNext();) {
+ size += ((ByteBuffer) iter.next()).remaining();
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ for (Iterator iter = packets.iterator(); iter.hasNext();) {
+ buffer.put((ByteBuffer) iter.next());
+ }
+ return (ByteBuffer)buffer.flip();
+ }
+
public final static byte NULL_TYPE = (byte) 0x00;
public final static byte BOOLEAN_TYPE = (byte) 0x01;
public final static byte CHARACTER_TYPE = (byte) 0x02;
1.5 +3 -4
incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol/SocketProtocolStressTest.java
Index: SocketProtocolStressTest.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol/SocketProtocolStressTest.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SocketProtocolStressTest.java 17 Mar 2004 03:12:00 -0000 1.4
+++ SocketProtocolStressTest.java 18 Mar 2004 04:05:27 -0000 1.5
@@ -20,11 +20,10 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.util.ArrayList;
-import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
import EDU.oswego.cs.dl.util.concurrent.CountDown;
+import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
import junit.framework.TestCase;
import org.apache.geronimo.network.SelectorManager;
@@ -91,7 +90,7 @@
finished.acquire();
Thread.sleep(5 * 1000);
-
+
assertEquals(WORKERS * MESSAGE_COUNT, count);
}