Author: trustin
Date: Fri Nov 12 06:36:57 2004
New Revision: 57525
Added:
incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
(contents, props changed)
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
Log:
Added: GatheringOutputEvent which contains multiple ByteBuffers.
Added: OutputManager.write(ClientKey, ByteBuffer[]) and its implementors
Modified: TCPOutputManager and UDPOutputManager listen to and process
GatheringOutputEvent
Modified: Now encoders can pass both ByteBuffer and ByteBuffer[] as a result.
DefaultEncoderManager generates OutputEvent for ByteBuffer and
GatheringOutputEvent for ByteBuffer[].
Please note that this code is not tested although I believe it should work very
fine. :)
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
Fri Nov 12 06:36:57 2004
@@ -14,10 +14,10 @@
* limitations under the License.
*
*/
-
package org.apache.seda.encoder;
import java.nio.ByteBuffer;
+
import java.util.EventObject;
import java.util.HashMap;
import java.util.Iterator;
@@ -30,6 +30,7 @@
import org.apache.seda.event.DisconnectEvent;
import org.apache.seda.event.DisconnectSubscriber;
import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
import org.apache.seda.event.OutputEvent;
import org.apache.seda.event.ResponseEvent;
import org.apache.seda.event.ResponseSubscriber;
@@ -54,8 +55,7 @@
* @version $Rev$
*/
public class DefaultEncoderManager extends DefaultStage
- implements EncoderManager, ResponseSubscriber,
- DisconnectSubscriber
+ implements EncoderManager, ResponseSubscriber, DisconnectSubscriber
{
/** the event router used to publish and subscribe to events on */
private final EventRouter router;
@@ -169,22 +169,31 @@
StatefulEncoder getEncoder(ClientKey key)
{
- StatefulEncoder encoder;
- if (key.isStateful()) {
- encoder = (StatefulEncoder) encoders.get(key);
- if (encoder == null) {
- synchronized (encoders) {
- encoder = (StatefulEncoder) encoders.get(key);
- if (encoder == null) {
- encoder = createClientEncoder(key);
- encoders.put(key, encoder);
- }
- }
- }
- } else {
- encoder = createClientEncoder(key);
- }
-
+ StatefulEncoder encoder;
+
+ if (key.isStateful())
+ {
+ encoder = (StatefulEncoder) encoders.get(key);
+
+ if (encoder == null)
+ {
+ synchronized (encoders)
+ {
+ encoder = (StatefulEncoder) encoders.get(key);
+
+ if (encoder == null)
+ {
+ encoder = createClientEncoder(key);
+ encoders.put(key, encoder);
+ }
+ }
+ }
+ }
+ else
+ {
+ encoder = createClientEncoder(key);
+ }
+
return encoder;
}
@@ -198,25 +207,33 @@
private StatefulEncoder createEncoder(ClientKey key)
{
TransportTypeEnum transportType;
- if (key instanceof UDPClientKey) {
- transportType = TransportTypeEnum.UDP;
- } else {
- transportType = TransportTypeEnum.TCP;
+
+ if (key instanceof UDPClientKey)
+ {
+ transportType = TransportTypeEnum.UDP;
+ }
+ else
+ {
+ transportType = TransportTypeEnum.TCP;
}
-
+
Iterator it = inetdb.getByPort(key.getLocalAddress().getPort());
ProtocolProvider provider = null;
- while (it.hasNext()) {
- InetServiceEntry entry = (InetServiceEntry) it.next();
- if (entry.getTransport() == transportType) {
- provider = entry.getProtocolProvider();
- break;
- }
+
+ while (it.hasNext())
+ {
+ InetServiceEntry entry = (InetServiceEntry) it.next();
+
+ if (entry.getTransport() == transportType)
+ {
+ provider = entry.getProtocolProvider();
+ break;
+ }
}
// TODO replace RuntimeException with ProtocolProviderNotFoundException
if (provider == null)
- throw new RuntimeException("No protocol provider available");
+ throw new RuntimeException("No protocol provider available");
return provider.getEncoderFactory().createEncoder();
}
@@ -241,12 +258,31 @@
Object encoded)
{
ClientKey key = ((ClientEncoder) encoder).getClientKey();
- OutputEvent event =
- new OutputEvent(this, key, (ByteBuffer) encoded);
+ EventObject event;
+
+ if (encoded instanceof ByteBuffer)
+ {
+ event =
+ new OutputEvent(this,
+ key,
+
(ByteBuffer) encoded);
+ }
+ else if (encoded instanceof ByteBuffer[])
+ {
+ event =
+ new
GatheringOutputEvent(this,
+
key,
+
(ByteBuffer[]) encoded);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Encoded data must
be ByteBuffer or ByteBuffer[].");
+ }
+
router.publish(event);
}
});
-
+
return encoder;
}
}
Added:
incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
==============================================================================
--- (empty file)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
Fri Nov 12 06:36:57 2004
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.seda.event;
+
+import java.nio.ByteBuffer;
+
+import org.apache.seda.listener.ClientKey;
+
+
+/**
+ * An event used to denote output to send to a client. The output event
+ * only connotates that data is available for output but not yet delivered.
+ *
+ * @author <a href="mailto:[EMAIL PROTECTED]">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class GatheringOutputEvent extends ClientEvent
+{
+ /** the data chunks to send */
+ private final ByteBuffer[] buffers;
+
+ /**
+ * Creates an output event using a clientKey, buf of data and the source
+ * for this event.
+ *
+ * @param source the source which created this event
+ * @param clientKey the key of the client
+ * @param buf the buffer containing the chunk to output
+ */
+ public GatheringOutputEvent(Object source, ClientKey clientKey,
ByteBuffer[] buffers)
+ {
+ super(source, clientKey);
+ this.buffers = buffers;
+ }
+
+ /**
+ * Gets the array of buffers containing the chunks to output.
+ *
+ * @return the array of chunks to flush back to the client
+ */
+ public ByteBuffer[] getBuffers()
+ {
+ return buffers;
+ }
+}
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
Fri Nov 12 06:36:57 2004
@@ -40,4 +40,6 @@
* @throws IOException if there is a failure while sending the data
*/
void write(ClientKey key, ByteBuffer buf) throws IOException;
+
+ void write(ClientKey key, ByteBuffer[] buffers) throws IOException;
}
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
Fri Nov 12 06:36:57 2004
@@ -14,12 +14,13 @@
* limitations under the License.
*
*/
-
package org.apache.seda.output;
import java.io.IOException;
+
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+
import java.util.EventObject;
import java.util.HashMap;
import java.util.Map;
@@ -31,6 +32,7 @@
import org.apache.seda.event.DisconnectSubscriber;
import org.apache.seda.event.EventFilter;
import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
import org.apache.seda.event.OutputEvent;
import org.apache.seda.event.OutputSubscriber;
import org.apache.seda.event.filter.ClientKeyTypeFilter;
@@ -79,24 +81,18 @@
{
super(config);
this.router = router;
- this.router.subscribe(
- new CompositeEventFilter(
- new EventFilter[] {
- new
EventTypeFilter(ConnectEvent.class),
- new
ClientKeyTypeFilter(TCPClientKey.class)
- }, CompositeEventFilter.AND), this);
- this.router.subscribe(
- new CompositeEventFilter(
- new EventFilter[] {
- new
EventTypeFilter(OutputEvent.class),
- new
ClientKeyTypeFilter(TCPClientKey.class)
- }, CompositeEventFilter.AND), this);
- this.router.subscribe(
- new CompositeEventFilter(
- new EventFilter[] {
- new
EventTypeFilter(DisconnectEvent.class),
- new
ClientKeyTypeFilter(TCPClientKey.class)
- }, CompositeEventFilter.AND), this);
+ this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new
EventTypeFilter(ConnectEvent.class), new
ClientKeyTypeFilter(TCPClientKey.class) },
+
CompositeEventFilter.AND),
+ this);
+ this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new
EventTypeFilter(OutputEvent.class), new ClientKeyTypeFilter(TCPClientKey.class)
},
+
CompositeEventFilter.AND),
+ this);
+ this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new
EventTypeFilter(GatheringOutputEvent.class), new
ClientKeyTypeFilter(TCPClientKey.class) },
+
CompositeEventFilter.AND),
+ this);
+ this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new
EventTypeFilter(DisconnectEvent.class), new
ClientKeyTypeFilter(TCPClientKey.class) },
+
CompositeEventFilter.AND),
+ this);
config.setHandler(new OutputStageHandler());
this.setStageMonitor(new LoggingStageMonitor());
this.setOutputMonitor(new LoggingOutputMonitor());
@@ -130,6 +126,11 @@
enqueue(event);
}
+ public void inform(GatheringOutputEvent event)
+ {
+ enqueue(event);
+ }
+
/* (non-Javadoc)
* @see org.apache.seda.event.ConnectSubscriber#inform(
* org.apache.seda.event.ConnectEvent)
@@ -184,6 +185,21 @@
monitor.writeOccurred(this, key);
}
+ public void write(ClientKey key, ByteBuffer[] buffers)
+ throws IOException
+ {
+ SocketChannel channel = (SocketChannel) channels.get(key);
+
+ if (null == channel)
+ {
+ monitor.channelMissing(this, key);
+ return;
+ }
+
+ channel.write(buffers);
+ monitor.writeOccurred(this, key);
+ }
+
/**
* Sets the output manager's monitor.
*
@@ -208,6 +224,20 @@
try
{
write(event.getClientKey(), event.getBuffer());
+ }
+ catch (IOException e)
+ {
+ monitor.failedOnWrite(TCPOutputManager.this,
+ event.getClientKey(), e);
+ }
+ }
+ else if (generic instanceof GatheringOutputEvent)
+ {
+ GatheringOutputEvent event = (GatheringOutputEvent) generic;
+
+ try
+ {
+ write(event.getClientKey(), event.getBuffers());
}
catch (IOException e)
{
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
Fri Nov 12 06:36:57 2004
@@ -14,16 +14,18 @@
* limitations under the License.
*
*/
-
package org.apache.seda.output;
import java.io.IOException;
+
import java.nio.ByteBuffer;
+
import java.util.EventObject;
import org.apache.seda.event.AbstractSubscriber;
import org.apache.seda.event.EventFilter;
import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
import org.apache.seda.event.OutputEvent;
import org.apache.seda.event.OutputSubscriber;
import org.apache.seda.event.filter.ClientKeyTypeFilter;
@@ -66,12 +68,12 @@
{
super(config);
this.router = router;
- this.router.subscribe(
- new CompositeEventFilter(
- new EventFilter[] {
- new
EventTypeFilter(OutputEvent.class),
- new
ClientKeyTypeFilter(UDPClientKey.class)
- }, CompositeEventFilter.AND), this);
+ this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new
EventTypeFilter(OutputEvent.class), new ClientKeyTypeFilter(UDPClientKey.class)
},
+
CompositeEventFilter.AND),
+ this);
+ this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new
EventTypeFilter(GatheringOutputEvent.class), new
ClientKeyTypeFilter(UDPClientKey.class) },
+
CompositeEventFilter.AND),
+ this);
config.setHandler(new OutputStageHandler());
this.setStageMonitor(new LoggingStageMonitor());
this.setOutputMonitor(new LoggingOutputMonitor());
@@ -105,6 +107,11 @@
enqueue(event);
}
+ public void inform(GatheringOutputEvent event)
+ {
+ enqueue(event);
+ }
+
// ------------------------------------------------------------------------
// OutputManager method
// ------------------------------------------------------------------------
@@ -117,11 +124,33 @@
public void write(ClientKey key, ByteBuffer buf)
throws IOException
{
- UDPClientKey udpKey = (UDPClientKey) key;
+ UDPClientKey udpKey = (UDPClientKey) key;
udpKey.getSocket().getChannel().send(buf, key.getRemoteAddress());
monitor.writeOccurred(this, key);
}
+ public void write(ClientKey key, ByteBuffer[] buffers)
+ throws IOException
+ {
+ UDPClientKey udpKey = (UDPClientKey) key;
+ int size = 0;
+
+ for (int i = buffers.length - 1; i >= 0; i--)
+ {
+ size += buffers[i].remaining();
+ }
+
+ ByteBuffer mergedBuf = ByteBuffer.allocate(size);
+
+ for (int i = 0; i < buffers.length; i++)
+ {
+ mergedBuf.put(buffers[i]);
+ }
+
+ udpKey.getSocket().getChannel().send(mergedBuf,
key.getRemoteAddress());
+ monitor.writeOccurred(this, key);
+ }
+
/**
* Sets the output manager's monitor.
*
@@ -149,8 +178,21 @@
}
catch (IOException e)
{
- monitor.failedOnWrite(
- UDPOutputManager.this,
+ monitor.failedOnWrite(UDPOutputManager.this,
+ event.getClientKey(), e);
+ }
+ }
+ else if (generic instanceof GatheringOutputEvent)
+ {
+ GatheringOutputEvent event = (GatheringOutputEvent) generic;
+
+ try
+ {
+ write(event.getClientKey(), event.getBuffers());
+ }
+ catch (IOException e)
+ {
+ monitor.failedOnWrite(UDPOutputManager.this,
event.getClientKey(), e);
}
}