Author: bloritsch Date: Thu Dec 9 08:43:24 2004 New Revision: 111404 URL: http://svn.apache.org/viewcvs?view=rev&rev=111404 Log: Add some meat to the Writer stage. Unfortunately, I can't seem to get it to emit what I expect... Modified: incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java
Modified: incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java&r2=111404 ============================================================================== --- incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java (original) +++ incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java Thu Dec 9 08:43:24 2004 @@ -59,8 +59,8 @@ public static void putBuffer( final ByteBuffer buffer ) { m_usedBuffers--; - assert buffer.isDirect(); - assert KILOBYTE == buffer.capacity(); + assert buffer.isDirect() && KILOBYTE == buffer.capacity() : "The provided buffer could not have been from this pool."; + buffer.clear(); m_reserveBuffers.add( new WeakReference(buffer) ); } Modified: incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java&r2=111404 ============================================================================== --- incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java (original) +++ incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java Thu Dec 9 08:43:24 2004 @@ -67,6 +67,7 @@ final ByteBuffer buffer = BufferPool.getBuffer(); event.channel().read( buffer ); + buffer.flip(); event.setBuffer( buffer ); } } Modified: incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java&r2=111404 ============================================================================== --- incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java (original) +++ incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java Thu Dec 9 08:43:24 2004 @@ -16,18 +16,19 @@ */ package org.apache.directory.seda.output; -import org.d_haven.event.Source; -import org.d_haven.event.Sink; -import org.d_haven.event.SinkException; -import org.d_haven.event.PreparedEnqueue; import org.apache.directory.seda.Stage; +import org.apache.directory.seda.NetworkEvent; +import org.apache.directory.seda.bufferpool.BufferPool; import java.nio.channels.SocketChannel; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; +import java.util.Collection; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedList; /** * Created by IntelliJ IDEA. User: berin Date: Dec 8, 2004 Time: 8:29:38 @@ -36,10 +37,91 @@ public class Writer extends Stage { private Selector m_selector; + private Map m_cachedEvents; public Writer() throws IOException { m_selector = Selector.open(); + m_cachedEvents = new HashMap(); + } + + public void handleEvent(final Object event) + { + if ( event instanceof NetworkEvent ) + { + final NetworkEvent netEvent = (NetworkEvent) event; + + try + { + if ( m_selector.selectNow() > 0 ) + { + final Iterator it = m_selector.selectedKeys().iterator(); + + while (it.hasNext()) + { + final SelectionKey key = (SelectionKey)it.next(); + final SocketChannel channel = (SocketChannel) key.channel(); + sendCachedEvents( key, channel ); + sendOrCache( netEvent, channel ); + } + } + } + catch ( IOException e ) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + else + { + super.handleEvent( event ); + } + } + + private void sendOrCache( final NetworkEvent netEvent, + final SocketChannel channel ) + throws IOException + { + if ( netEvent.channel().equals(channel) ) + { + sendEvent( channel, netEvent ); + } + else + { + Collection cached = (Collection)m_cachedEvents.get( channel ); + if ( cached == null ) + { + cached = new LinkedList(); + m_cachedEvents.put( channel, cached ); + } + + cached.add( netEvent ); + } + } + + private void sendEvent( final SocketChannel channel, + final NetworkEvent netEvent ) + throws IOException + { + channel.write( netEvent.getBuffer() ); + BufferPool.putBuffer( netEvent.getBuffer() ); + netEvent.setBuffer(null); + } + + private void sendCachedEvents( final SelectionKey key, + final SocketChannel channel ) + throws IOException + { + final Collection cached = (Collection)m_cachedEvents.get(key.channel()); + + if (cached != null) + { + final Iterator it = cached.iterator(); + while(it.hasNext()) + { + final NetworkEvent netEvent = (NetworkEvent)it.next(); + sendEvent( channel, netEvent ); + } + } } public void connect( final SocketChannel client ) throws IOException Modified: incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java&r2=111404 ============================================================================== --- incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java (original) +++ incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java Thu Dec 9 08:43:24 2004 @@ -29,6 +29,7 @@ public class TestBufferPool extends TestCase { private static final int KB = 1024; + private static final int TOO_SMALL = 256; public TestBufferPool(final String name) { @@ -81,5 +82,45 @@ BufferPool.putBuffer( buffer2 ); assertEquals( 1, BufferPool.buffersInReserve() ); + } + + public void testPutBuffer_notDirect() + { + boolean assertionsEnabled = false; + assert assertionsEnabled = true; + + if (assertionsEnabled) + { + try + { + BufferPool.putBuffer( ByteBuffer.allocate( KB ) ); + fail("Did not throw AssertionError as expected."); + } + catch (Error e) + { + assertTrue("Caught error, but not the expected type" + e, e instanceof AssertionError); + if ( ! (e instanceof AssertionError ) ) throw e; + } + } + } + + public void testPutBuffer_tooSmall() + { + boolean assertionsEnabled = false; + assert assertionsEnabled = true; + + if (assertionsEnabled) + { + try + { + BufferPool.putBuffer( ByteBuffer.allocateDirect( TOO_SMALL ) ); + fail("Did not throw AssertionError as expected."); + } + catch (Error e) + { + assertTrue("Caught error, but not the expected type" + e, e instanceof AssertionError); + if ( ! (e instanceof AssertionError ) ) throw e; + } + } } } Modified: incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java&r2=111404 ============================================================================== --- incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java (original) +++ incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java Thu Dec 9 08:43:24 2004 @@ -18,8 +18,15 @@ import junit.framework.TestCase; import org.apache.directory.seda.output.Writer; +import org.apache.directory.seda.NetworkEvent; +import org.apache.directory.seda.bufferpool.BufferPool; import java.io.IOException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.ByteBuffer; +import java.net.InetSocketAddress; +import java.net.InetAddress; /** * Created by IntelliJ IDEA. User: berin Date: Dec 9, 2004 Time: @@ -28,15 +35,73 @@ */ public class TestWriter extends TestCase { + private ServerSocketChannel m_serverChannel; + private static final int PORT = 6666; + private SocketChannel m_channel; + private SocketChannel m_clientChannel; + private Writer m_writer; + public TestWriter(final String name) { super(name); } + + public void setUp() throws Exception + { + super.setUp(); + + m_serverChannel = ServerSocketChannel.open(); + m_serverChannel.socket().bind( new InetSocketAddress(PORT) ); + + m_channel = SocketChannel.open(); + m_channel.configureBlocking( false ); + m_channel.connect( new InetSocketAddress(InetAddress.getLocalHost(), PORT) ); + + m_clientChannel = m_serverChannel.accept(); + m_channel.finishConnect(); + m_channel.configureBlocking( true ); + + m_writer = new Writer(); + m_writer.connect( m_clientChannel ); + } + + public void tearDown() throws Exception + { + super.tearDown(); + + m_writer.disconnect(m_clientChannel); + m_writer.close(); + + m_clientChannel.close(); + m_serverChannel.close(); + m_channel.close(); + } + public void testCreate() throws IOException { final Writer writer = new Writer(); assertNotNull(writer); + } + + public void testSendNetworkEvent() throws IOException + { + final NetworkEvent event = new NetworkEvent(m_clientChannel); + event.setBuffer( BufferPool.getBuffer() ); + event.getBuffer().put( "test".getBytes() ).flip(); + m_writer.handleEvent( event ); + + final ByteBuffer buffer = ByteBuffer.allocate( 20 ); + m_channel.read( buffer ); + buffer.flip(); + + assertEquals("test", buffer.asCharBuffer().toString()); + } + + public void testSendIncorrectEvent() + { + m_writer.handleEvent( "test" ); + assertEquals("test", m_writer.getDefaultPipe().dequeue() ); } }
