Author: trustin
Date: Fri Nov 12 06:36:57 2004
New Revision: 57525

Added:
   
incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
   (contents, props changed)
Modified:
   
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
   
incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
   
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
   
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
Log:
Added: GatheringOutputEvent which contains multiple ByteBuffers.
Added: OutputManager.write(ClientKey, ByteBuffer[]) and its implementors

Modified: TCPOutputManager and UDPOutputManager listen to and process 
GatheringOutputEvent
Modified: Now encoders can pass both ByteBuffer and ByteBuffer[] as a result.  
DefaultEncoderManager generates OutputEvent for ByteBuffer and 
GatheringOutputEvent for ByteBuffer[].

Please note that this code is not tested although I believe it should work very 
fine. :)

Modified: 
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- 
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
  (original)
+++ 
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
  Fri Nov 12 06:36:57 2004
@@ -14,10 +14,10 @@
  *   limitations under the License.
  *
  */
-
 package org.apache.seda.encoder;
 
 import java.nio.ByteBuffer;
+
 import java.util.EventObject;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -30,6 +30,7 @@
 import org.apache.seda.event.DisconnectEvent;
 import org.apache.seda.event.DisconnectSubscriber;
 import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
 import org.apache.seda.event.OutputEvent;
 import org.apache.seda.event.ResponseEvent;
 import org.apache.seda.event.ResponseSubscriber;
@@ -54,8 +55,7 @@
  * @version $Rev$
  */
 public class DefaultEncoderManager extends DefaultStage
-    implements EncoderManager, ResponseSubscriber,
-               DisconnectSubscriber
+    implements EncoderManager, ResponseSubscriber, DisconnectSubscriber
 {
     /** the event router used to publish and subscribe to events on */
     private final EventRouter router;
@@ -169,22 +169,31 @@
 
     StatefulEncoder getEncoder(ClientKey key)
     {
-       StatefulEncoder encoder;
-       if (key.isStateful()) {
-               encoder = (StatefulEncoder) encoders.get(key);
-               if (encoder == null) {
-                       synchronized (encoders) {
-                               encoder = (StatefulEncoder) encoders.get(key);
-                               if (encoder == null) {
-                                       encoder = createClientEncoder(key);
-                                       encoders.put(key, encoder);
-                               }
-                       }
-               }
-       } else {
-                       encoder = createClientEncoder(key);
-       }
-       
+        StatefulEncoder encoder;
+
+        if (key.isStateful())
+        {
+            encoder = (StatefulEncoder) encoders.get(key);
+
+            if (encoder == null)
+            {
+                synchronized (encoders)
+                {
+                    encoder = (StatefulEncoder) encoders.get(key);
+
+                    if (encoder == null)
+                    {
+                        encoder = createClientEncoder(key);
+                        encoders.put(key, encoder);
+                    }
+                }
+            }
+        }
+        else
+        {
+            encoder = createClientEncoder(key);
+        }
+
         return encoder;
     }
 
@@ -198,25 +207,33 @@
     private StatefulEncoder createEncoder(ClientKey key)
     {
         TransportTypeEnum transportType;
-        if (key instanceof UDPClientKey) {
-               transportType = TransportTypeEnum.UDP;
-        } else {
-               transportType = TransportTypeEnum.TCP;
+
+        if (key instanceof UDPClientKey)
+        {
+            transportType = TransportTypeEnum.UDP;
+        }
+        else
+        {
+            transportType = TransportTypeEnum.TCP;
         }
-        
+
         Iterator it = inetdb.getByPort(key.getLocalAddress().getPort());
         ProtocolProvider provider = null;
-        while (it.hasNext()) {
-               InetServiceEntry entry = (InetServiceEntry) it.next();
-               if (entry.getTransport() == transportType) {
-                       provider = entry.getProtocolProvider();
-                       break;
-               }
+
+        while (it.hasNext())
+        {
+            InetServiceEntry entry = (InetServiceEntry) it.next();
+
+            if (entry.getTransport() == transportType)
+            {
+                provider = entry.getProtocolProvider();
+                break;
+            }
         }
 
         // TODO replace RuntimeException with ProtocolProviderNotFoundException
         if (provider == null)
-               throw new RuntimeException("No protocol provider available");
+            throw new RuntimeException("No protocol provider available");
 
         return provider.getEncoderFactory().createEncoder();
     }
@@ -241,12 +258,31 @@
                                            Object encoded)
                 {
                     ClientKey key = ((ClientEncoder) encoder).getClientKey();
-                    OutputEvent event =
-                            new OutputEvent(this, key, (ByteBuffer) encoded);
+                    EventObject event;
+
+                    if (encoded instanceof ByteBuffer)
+                    {
+                        event =
+                                                        new OutputEvent(this,
+                                                                        key,
+                                                                        
(ByteBuffer) encoded);
+                    }
+                    else if (encoded instanceof ByteBuffer[])
+                    {
+                        event =
+                                                        new 
GatheringOutputEvent(this,
+                                                                               
  key,
+                                                                               
  (ByteBuffer[]) encoded);
+                    }
+                    else
+                    {
+                        throw new IllegalArgumentException("Encoded data must 
be ByteBuffer or ByteBuffer[].");
+                    }
+
                     router.publish(event);
                 }
             });
-        
+
         return encoder;
     }
 }

Added: 
incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
==============================================================================
--- (empty file)
+++ 
incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
     Fri Nov 12 06:36:57 2004
@@ -0,0 +1,60 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+
+package org.apache.seda.event;
+
+import java.nio.ByteBuffer;
+
+import org.apache.seda.listener.ClientKey;
+
+
+/**
+ * An event used to denote output to send to a client.  The output event
+ * only connotates that data is available for output but not yet delivered.
+ *
+ * @author <a href="mailto:[EMAIL PROTECTED]">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class GatheringOutputEvent extends ClientEvent
+{
+    /** the data chunks to send */
+    private final ByteBuffer[] buffers;
+
+    /**
+     * Creates an output event using a clientKey, buf of data and the source
+     * for this event.
+     *
+     * @param source the source which created this event
+     * @param clientKey the key of the client
+     * @param buf the buffer containing the chunk to output
+     */
+    public GatheringOutputEvent(Object source, ClientKey clientKey, 
ByteBuffer[] buffers)
+    {
+        super(source, clientKey);
+        this.buffers = buffers;
+    }
+
+    /**
+     * Gets the array of buffers containing the chunks to output.
+     *
+     * @return the array of chunks to flush back to the client
+     */
+    public ByteBuffer[] getBuffers()
+    {
+        return buffers;
+    }
+}

Modified: 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
==============================================================================
--- 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
   (original)
+++ 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
   Fri Nov 12 06:36:57 2004
@@ -40,4 +40,6 @@
      * @throws IOException if there is a failure while sending the data
      */
     void write(ClientKey key, ByteBuffer buf) throws IOException;
+    
+    void write(ClientKey key, ByteBuffer[] buffers) throws IOException;
 }

Modified: 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
==============================================================================
--- 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
        (original)
+++ 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
        Fri Nov 12 06:36:57 2004
@@ -14,12 +14,13 @@
  *   limitations under the License.
  *
  */
-
 package org.apache.seda.output;
 
 import java.io.IOException;
+
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+
 import java.util.EventObject;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,6 +32,7 @@
 import org.apache.seda.event.DisconnectSubscriber;
 import org.apache.seda.event.EventFilter;
 import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
 import org.apache.seda.event.OutputEvent;
 import org.apache.seda.event.OutputSubscriber;
 import org.apache.seda.event.filter.ClientKeyTypeFilter;
@@ -79,24 +81,18 @@
     {
         super(config);
         this.router = router;
-        this.router.subscribe(
-                       new CompositeEventFilter(
-                                       new EventFilter[] {
-                                                       new 
EventTypeFilter(ConnectEvent.class),
-                                                               new 
ClientKeyTypeFilter(TCPClientKey.class)
-                                       }, CompositeEventFilter.AND), this);
-        this.router.subscribe(
-                       new CompositeEventFilter(
-                                       new EventFilter[] {
-                                                       new 
EventTypeFilter(OutputEvent.class),
-                                                               new 
ClientKeyTypeFilter(TCPClientKey.class)
-                                       }, CompositeEventFilter.AND), this);
-        this.router.subscribe(
-                       new CompositeEventFilter(
-                                       new EventFilter[] {
-                                                       new 
EventTypeFilter(DisconnectEvent.class),
-                                                               new 
ClientKeyTypeFilter(TCPClientKey.class)
-                                       }, CompositeEventFilter.AND), this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new 
EventTypeFilter(ConnectEvent.class), new 
ClientKeyTypeFilter(TCPClientKey.class) },
+                                                       
CompositeEventFilter.AND),
+                              this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new 
EventTypeFilter(OutputEvent.class), new ClientKeyTypeFilter(TCPClientKey.class) 
},
+                                                       
CompositeEventFilter.AND),
+                              this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new 
EventTypeFilter(GatheringOutputEvent.class), new 
ClientKeyTypeFilter(TCPClientKey.class) },
+                                                       
CompositeEventFilter.AND),
+                              this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new 
EventTypeFilter(DisconnectEvent.class), new 
ClientKeyTypeFilter(TCPClientKey.class) },
+                                                       
CompositeEventFilter.AND),
+                              this);
         config.setHandler(new OutputStageHandler());
         this.setStageMonitor(new LoggingStageMonitor());
         this.setOutputMonitor(new LoggingOutputMonitor());
@@ -130,6 +126,11 @@
         enqueue(event);
     }
 
+    public void inform(GatheringOutputEvent event)
+    {
+        enqueue(event);
+    }
+
     /* (non-Javadoc)
      * @see org.apache.seda.event.ConnectSubscriber#inform(
      * org.apache.seda.event.ConnectEvent)
@@ -184,6 +185,21 @@
         monitor.writeOccurred(this, key);
     }
 
+    public void write(ClientKey key, ByteBuffer[] buffers)
+               throws IOException
+    {
+        SocketChannel channel = (SocketChannel) channels.get(key);
+
+        if (null == channel)
+        {
+            monitor.channelMissing(this, key);
+            return;
+        }
+
+        channel.write(buffers);
+        monitor.writeOccurred(this, key);
+    }
+
     /**
      * Sets the output manager's monitor.
      *
@@ -208,6 +224,20 @@
                 try
                 {
                     write(event.getClientKey(), event.getBuffer());
+                }
+                catch (IOException e)
+                {
+                    monitor.failedOnWrite(TCPOutputManager.this,
+                                          event.getClientKey(), e);
+                }
+            }
+            else if (generic instanceof GatheringOutputEvent)
+            {
+                GatheringOutputEvent event = (GatheringOutputEvent) generic;
+
+                try
+                {
+                    write(event.getClientKey(), event.getBuffers());
                 }
                 catch (IOException e)
                 {

Modified: 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
==============================================================================
--- 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
        (original)
+++ 
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
        Fri Nov 12 06:36:57 2004
@@ -14,16 +14,18 @@
  *   limitations under the License.
  *
  */
-
 package org.apache.seda.output;
 
 import java.io.IOException;
+
 import java.nio.ByteBuffer;
+
 import java.util.EventObject;
 
 import org.apache.seda.event.AbstractSubscriber;
 import org.apache.seda.event.EventFilter;
 import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
 import org.apache.seda.event.OutputEvent;
 import org.apache.seda.event.OutputSubscriber;
 import org.apache.seda.event.filter.ClientKeyTypeFilter;
@@ -66,12 +68,12 @@
     {
         super(config);
         this.router = router;
-        this.router.subscribe(
-                       new CompositeEventFilter(
-                                       new EventFilter[] {
-                                                       new 
EventTypeFilter(OutputEvent.class),
-                                                               new 
ClientKeyTypeFilter(UDPClientKey.class)
-                                       }, CompositeEventFilter.AND), this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new 
EventTypeFilter(OutputEvent.class), new ClientKeyTypeFilter(UDPClientKey.class) 
},
+                                                       
CompositeEventFilter.AND),
+                              this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new 
EventTypeFilter(GatheringOutputEvent.class), new 
ClientKeyTypeFilter(UDPClientKey.class) },
+                                                       
CompositeEventFilter.AND),
+                              this);
         config.setHandler(new OutputStageHandler());
         this.setStageMonitor(new LoggingStageMonitor());
         this.setOutputMonitor(new LoggingOutputMonitor());
@@ -105,6 +107,11 @@
         enqueue(event);
     }
 
+    public void inform(GatheringOutputEvent event)
+    {
+        enqueue(event);
+    }
+
     // ------------------------------------------------------------------------
     // OutputManager method
     // ------------------------------------------------------------------------
@@ -117,11 +124,33 @@
     public void write(ClientKey key, ByteBuffer buf)
                throws IOException
     {
-       UDPClientKey udpKey = (UDPClientKey) key;
+        UDPClientKey udpKey = (UDPClientKey) key;
         udpKey.getSocket().getChannel().send(buf, key.getRemoteAddress());
         monitor.writeOccurred(this, key);
     }
 
+    public void write(ClientKey key, ByteBuffer[] buffers)
+               throws IOException
+    {
+        UDPClientKey udpKey = (UDPClientKey) key;
+        int size = 0;
+
+        for (int i = buffers.length - 1; i >= 0; i--)
+        {
+            size += buffers[i].remaining();
+        }
+
+        ByteBuffer mergedBuf = ByteBuffer.allocate(size);
+
+        for (int i = 0; i < buffers.length; i++)
+        {
+            mergedBuf.put(buffers[i]);
+        }
+
+        udpKey.getSocket().getChannel().send(mergedBuf, 
key.getRemoteAddress());
+        monitor.writeOccurred(this, key);
+    }
+
     /**
      * Sets the output manager's monitor.
      *
@@ -149,8 +178,21 @@
                 }
                 catch (IOException e)
                 {
-                    monitor.failedOnWrite(
-                                          UDPOutputManager.this,
+                    monitor.failedOnWrite(UDPOutputManager.this,
+                                          event.getClientKey(), e);
+                }
+            }
+            else if (generic instanceof GatheringOutputEvent)
+            {
+                GatheringOutputEvent event = (GatheringOutputEvent) generic;
+
+                try
+                {
+                    write(event.getClientKey(), event.getBuffers());
+                }
+                catch (IOException e)
+                {
+                    monitor.failedOnWrite(UDPOutputManager.this,
                                           event.getClientKey(), e);
                 }
             }

Reply via email to