Author: jstrachan
Date: Thu Mar  9 12:26:47 2006
New Revision: 384603

URL: http://svn.apache.org/viewcvs?rev=384603&view=rev
Log:
added a working UDP server with test cases

Added:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/project.xml
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java

Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Thu Mar  9 12:26:47 2006
@@ -360,10 +360,6 @@
                 <exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
                 <exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
                 
<exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
-                
-                <!-- TODO FIXME -->
-                <exclude>**/UdpTransportUsingServerTest.*</exclude>
-                <exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude>
             </excludes>
         </unitTest>
         <resources>

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
 Thu Mar  9 12:26:47 2006
@@ -81,11 +81,13 @@
     
     public OpenWireFormat copy() {
         OpenWireFormat answer = new OpenWireFormat();
+        answer.version = version;
         answer.stackTraceEnabled = stackTraceEnabled;
         answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
         answer.cacheEnabled = cacheEnabled;
         answer.tightEncodingEnabled = tightEncodingEnabled;
         answer.sizePrefixDisabled = sizePrefixDisabled;
+        answer.preferedWireFormatInfo = preferedWireFormatInfo;
         return answer;
     }
     
@@ -104,8 +106,8 @@
     static IdGenerator g = new IdGenerator();
     String id = g.generateId();
     public String toString() {
-        //return "OpenWireFormat{version="+version+", 
cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", 
tightEncodingEnabled="+tightEncodingEnabled+", 
sizePrefixDisabled="+sizePrefixDisabled+"}";
-        return "OpenWireFormat{id="+id+", 
tightEncodingEnabled="+tightEncodingEnabled+"}";
+        return "OpenWireFormat{version="+version+", 
cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", 
tightEncodingEnabled="+tightEncodingEnabled+", 
sizePrefixDisabled="+sizePrefixDisabled+"}";
+        //return "OpenWireFormat{id="+id+", 
tightEncodingEnabled="+tightEncodingEnabled+"}";
     }
     
     public int getVersion() {

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
 Thu Mar  9 12:26:47 2006
@@ -94,7 +94,14 @@
                     onException(new IOException("Remote wire format 
("+info.getVersion()+") is lower the minimum version required 
("+minimumVersion+")"));
                 }
                 
+                if (log.isDebugEnabled()) {
+                    log.debug(this + " before negotiation: " + wireFormat);
+                }
                 wireFormat.renegociatWireFormat(info);
+                
+                if (log.isDebugEnabled()) {
+                    log.debug(this + " after negotiation: " + wireFormat);
+                }
        
             } catch (IOException e) {
                 onException(e);

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
 Thu Mar  9 12:26:47 2006
@@ -43,6 +43,7 @@
 
     private static final Log log = LogFactory.getLog(CommandChannel.class);
 
+    private final String name;
     private DatagramChannel channel;
     private OpenWireFormat wireFormat;
     private ByteBufferPool bufferPool;
@@ -50,11 +51,12 @@
     private DatagramReplayStrategy replayStrategy;
     private SocketAddress targetAddress;
     private DatagramHeaderMarshaller headerMarshaller = new 
DatagramHeaderMarshaller();
+    private final boolean checkSequenceNumbers;
 
     // reading
     private Object readLock = new Object();
     private ByteBuffer readBuffer;
-    private CommandReadBuffer readStack;
+    private DatagramReadBuffer readStack;
     private SocketAddress lastReadDatagramAddress;
 
     // writing
@@ -64,14 +66,20 @@
     private int largeMessageBufferSize = 128 * 1024;
     private DatagramHeader header = new DatagramHeader();
 
-    public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, 
ByteBufferPool bufferPool, int datagramSize,
-            DatagramReplayStrategy replayStrategy, SocketAddress 
targetAddress) {
+    public CommandChannel(String name, DatagramChannel channel, OpenWireFormat 
wireFormat, ByteBufferPool bufferPool, int datagramSize,
+            DatagramReplayStrategy replayStrategy, SocketAddress 
targetAddress, boolean checkSequenceNumbers) {
+        this.name = name;
         this.channel = channel;
         this.wireFormat = wireFormat;
         this.bufferPool = bufferPool;
         this.datagramSize = datagramSize;
         this.replayStrategy = replayStrategy;
         this.targetAddress = targetAddress;
+        this.checkSequenceNumbers = checkSequenceNumbers;
+    }
+
+    public String toString() {
+        return "CommandChannel#" + name;
     }
 
     public void start() throws Exception {
@@ -79,7 +87,9 @@
         wireFormat.setCacheEnabled(false);
         wireFormat.setTightEncodingEnabled(true);
 
-        readStack = new CommandReadBuffer(wireFormat, replayStrategy);
+        if (checkSequenceNumbers) {
+            readStack = new CommandReadBuffer(name, wireFormat, 
replayStrategy);
+        }
         bufferPool.setDefaultSize(datagramSize);
         bufferPool.start();
         readBuffer = bufferPool.borrowBuffer();
@@ -98,26 +108,21 @@
             readBuffer.clear();
             lastReadDatagramAddress = channel.receive(readBuffer);
             readBuffer.flip();
-            
-            if (log.isDebugEnabled()) {
-                log.debug("Read a datagram from: " + lastReadDatagramAddress);
-            }
+
             header = headerMarshaller.readHeader(readBuffer);
             header.setFromAddress(lastReadDatagramAddress);
 
             if (log.isDebugEnabled()) {
-                log.debug("Received datagram from: " + lastReadDatagramAddress 
+ " header: " + header);
+                log.debug("Received datagram on: " + name + " from: " + 
lastReadDatagramAddress + " header: " + header);
             }
             int remaining = readBuffer.remaining();
             int size = header.getDataSize();
             /*
-            if (size > remaining) {
-                throw new IOException("Invalid command size: " + size + " when 
there are only: " + remaining + " byte(s) remaining");
-            }
-            else if (size < remaining) {
-                log.warn("Extra bytes in buffer. Expecting: " + size + " but 
has: " + remaining);
-            }
-            */
+             * if (size > remaining) { throw new IOException("Invalid command
+             * size: " + size + " when there are only: " + remaining + " 
byte(s)
+             * remaining"); } else if (size < remaining) { log.warn("Extra 
bytes
+             * in buffer. Expecting: " + size + " but has: " + remaining); }
+             */
             if (size != remaining) {
                 log.warn("Expecting: " + size + " but has: " + remaining);
             }
@@ -133,23 +138,39 @@
                 // TODO use a DataInput implementation that talks direct to the
                 // ByteBuffer
                 DataInputStream dataIn = new DataInputStream(new 
ByteArrayInputStream(data));
-                Command command = (Command) wireFormat.doUnmarshal(dataIn);
+                Command command = (Command) wireFormat.unmarshal(dataIn);
+                // Command command = (Command) wireFormat.doUnmarshal(dataIn);
                 header.setCommand(command);
             }
 
-            answer = readStack.read(header);
+            if (readStack != null) {
+                answer = readStack.read(header);
+            }
+            else {
+                answer = header.getCommand();
+            }
         }
         if (answer != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Channel: " + name + " about to process: " + answer);
+            }
             processor.process(answer, header);
         }
     }
 
     /**
-     * Called if a packet is received on a different channel from a remote 
client
-     * @throws IOException 
+     * Called if a packet is received on a different channel from a remote
+     * client
+     * 
+     * @throws IOException
      */
     public Command onDatagramReceived(DatagramHeader header) throws 
IOException {
-        return readStack.read(header);
+        if (readStack != null) {
+            return readStack.read(header);
+        }
+        else {
+            return header.getCommand();
+        }
     }
 
     public void write(Command command) throws IOException {
@@ -159,10 +180,12 @@
     public void write(Command command, SocketAddress address) throws 
IOException {
         synchronized (writeLock) {
             header.incrementCounter();
-            bs = new BooleanStream();
-            // TODO
-            //bs.clear();
-            int size = wireFormat.tightMarshal1(command, bs);
+
+            ByteArrayOutputStream largeBuffer = new 
ByteArrayOutputStream(largeMessageBufferSize);
+            wireFormat.marshal(command, new DataOutputStream(largeBuffer));
+            byte[] data = largeBuffer.toByteArray();
+            int size = data.length;
+
             if (size < datagramSize) {
                 header.setPartial(false);
                 header.setComplete(true);
@@ -170,13 +193,6 @@
                 writeBuffer.clear();
                 headerMarshaller.writeHeader(header, writeBuffer);
 
-                // TODO use a DataOutput implementation that talks direct to 
the
-                // ByteBuffer
-                ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-                DataOutputStream dataOut = new DataOutputStream(buffer);
-                wireFormat.tightMarshal2(command, dataOut, bs);
-                dataOut.close();
-                byte[] data = buffer.toByteArray();
                 writeBuffer.put(data);
 
                 sendWriteBuffer(address);
@@ -186,10 +202,7 @@
                 header.setComplete(false);
 
                 // lets split the command up into chunks
-                ByteArrayOutputStream largeBuffer = new 
ByteArrayOutputStream(largeMessageBufferSize);
-                wireFormat.marshal(command, new DataOutputStream(largeBuffer));
 
-                byte[] data = largeBuffer.toByteArray();
                 int offset = 0;
                 boolean lastFragment = false;
                 for (int fragment = 0, length = data.length; !lastFragment; 
fragment++) {
@@ -248,15 +261,14 @@
             return lastReadDatagramAddress;
         }
     }
-    
 
     // Implementation methods
     // 
-------------------------------------------------------------------------
     protected void sendWriteBuffer(SocketAddress address) throws IOException {
         writeBuffer.flip();
-        
+
         if (log.isDebugEnabled()) {
-            log.debug("Sending datagram to: " + address + " header: " + 
header);
+            log.debug("Channel: " + name + " sending datagram to: " + address 
+ " header: " + header);
         }
         channel.send(writeBuffer, address);
     }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
 Thu Mar  9 12:26:47 2006
@@ -35,7 +35,7 @@
  * 
  * @version $Revision$
  */
-public class CommandReadBuffer {
+public class CommandReadBuffer implements DatagramReadBuffer {
     private static final Log log = LogFactory.getLog(CommandReadBuffer.class);
 
     private OpenWireFormat wireFormat;
@@ -43,8 +43,10 @@
     private SortedSet headers = new TreeSet();
     private long expectedCounter = 1;
     private ByteArrayOutputStream out = new ByteArrayOutputStream();
+    private final String name;
 
-    public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy 
replayStrategy) {
+    public CommandReadBuffer(String name, OpenWireFormat wireFormat, 
DatagramReplayStrategy replayStrategy) {
+        this.name = name;
         this.wireFormat = wireFormat;
         this.replayStrategy = replayStrategy;
     }
@@ -57,13 +59,16 @@
                 log.warn("Ignoring out of step packet: " + header);
             }
             else {
-                replayStrategy.onDroppedPackets(expectedCounter, 
actualCounter);
+                replayStrategy.onDroppedPackets(name, expectedCounter, 
actualCounter);
                 
                 // lets add it to the list for later on
                 headers.add(header);
             }
 
             // lets see if the first item in the set is the next header
+            if (headers.isEmpty()) {
+                return null;
+            }
             header = (DatagramHeader) headers.first();
             if (expectedCounter != header.getCounter()) {
                 return null;
@@ -71,7 +76,7 @@
         }
 
         // we've got a valid header so increment counter
-        replayStrategy.onReceivedPacket(expectedCounter);
+        replayStrategy.onReceivedPacket(name, expectedCounter);
         expectedCounter++;
 
         Command answer = null;

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java?rev=384603&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
 Thu Mar  9 12:26:47 2006
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+
+import java.io.IOException;
+
+/**
+ * Represents an inbound buffer of datagrams for dealing with out of order
+ * or fragmented commands.
+ * 
+ * @version $Revision$
+ */
+public interface DatagramReadBuffer {
+
+    Command read(DatagramHeader header) throws IOException;
+
+}
\ No newline at end of file

Propchange: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
 Thu Mar  9 12:26:47 2006
@@ -56,6 +56,7 @@
     private DatagramChannel channel;
     private boolean trace = false;
     private boolean useLocalHost = true;
+    private boolean checkSequenceNumbers = true;
     private int port;
     private int minmumWireFormatVersion;
     private String description = null;
@@ -112,7 +113,8 @@
         commandChannel.write(command, address);
     }
 
-    public void doConsume(Command command, DatagramHeader header) throws 
IOException {
+
+    public void receivedHeader(DatagramHeader header) {
         wireFormatHeader = header;
     }
 
@@ -253,6 +255,14 @@
     public OpenWireFormat getWireFormat() {
         return wireFormat;
     }
+    
+    public boolean isCheckSequenceNumbers() {
+        return checkSequenceNumbers;
+    }
+
+    public void setCheckSequenceNumbers(boolean checkSequenceNumbers) {
+        this.checkSequenceNumbers = checkSequenceNumbers;
+    }
 
     // Implementation methods
     // 
-------------------------------------------------------------------------
@@ -303,7 +313,7 @@
         if (bufferPool == null) {
             bufferPool = new DefaultBufferPool();
         }
-        commandChannel = new CommandChannel(channel, wireFormat, bufferPool, 
datagramSize, replayStrategy, targetAddress);
+        commandChannel = new CommandChannel(toString(), channel, wireFormat, 
bufferPool, datagramSize, replayStrategy, targetAddress, 
isCheckSequenceNumbers());
         commandChannel.start();
 
         // lets pass the header & address into the channel so it avoids a
@@ -336,5 +346,6 @@
             targetAddress = lastAddress;
         }
     }
+
 
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
 Thu Mar  9 12:26:47 2006
@@ -85,7 +85,6 @@
 
     protected Transport createTransport(URI location, WireFormat wf) throws 
UnknownHostException, IOException {
         OpenWireFormat wireFormat = asOpenWireFormat(wf);
-        wireFormat.setSizePrefixDisabled(true);
         return new UdpTransport(wireFormat, location);
     }
 
@@ -113,7 +112,7 @@
         transport = new WireFormatNegotiator(transport, 
asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
             protected void onWireFormatNegotiated(WireFormatInfo info) {
                 // lets switch to the targetAddress that the last packet was
-                // received as
+                // received as so that all future requests go to the newly 
created UDP channel
                 udpTransport.useLastInboundDatagramAsNewTarget();
             }
         };
@@ -122,8 +121,6 @@
 
     protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
         OpenWireFormat answer = (OpenWireFormat) wf;
-        answer.setSizePrefixDisabled(true);
-        answer.setCacheEnabled(false);
         return answer;
     }
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
 Thu Mar  9 12:26:47 2006
@@ -24,7 +24,6 @@
 import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.TransportLogger;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.TransportServerSupport;
 import org.apache.activemq.transport.WireFormatNegotiator;
@@ -55,6 +54,10 @@
         super(connectURI);
         this.serverTransport = serverTransport;
         this.configuredTransport = configuredTransport;
+
+        // lets disable the incremental checking of the sequence numbers
+        // as we are getting messages from many different clients
+        serverTransport.setCheckSequenceNumbers(false);
     }
 
     public String toString() {
@@ -96,12 +99,16 @@
 
     public void process(Command command, DatagramHeader header) throws 
IOException {
         SocketAddress address = header.getFromAddress();
-        System.out.println(toString() + " received command: " + command + " 
from address: " + address);
+        if (log.isDebugEnabled()) {
+            log.debug("Received command on: " + this + " from address: " + 
address + " command: " + command);
+        }
         Transport transport = null;
         synchronized (transports) {
             transport = (Transport) transports.get(address);
             if (transport == null) {
-                System.out.println("###Êcreating new server connector");
+                if (log.isDebugEnabled()) {
+                    log.debug("Creating a new UDP server connection");
+                }
                 transport = createTransport(command, header);
                 transport = configureTransport(transport);
                 transports.put(address, transport);
@@ -114,23 +121,30 @@
 
     protected Transport configureTransport(Transport transport) {
         transport = new ResponseCorrelator(transport);
-        
-        // TODO
-        //transport = new InactivityMonitor(transport, 
serverTransport.getMaxInactivityDuration());
 
+        if (serverTransport.getMaxInactivityDuration() > 0) {
+            transport = new InactivityMonitor(transport, 
serverTransport.getMaxInactivityDuration());
+        }
+        
         getAcceptListener().onAccept(transport);
         return transport;
     }
 
-    protected Transport createTransport(Command command, DatagramHeader 
header) throws IOException {
+    protected Transport createTransport(final Command command, DatagramHeader 
header) throws IOException {
         final SocketAddress address = header.getFromAddress();
-        // TODO lets copy the wireformat...
-        final UdpTransport transport = new 
UdpTransport(serverTransport.getWireFormat(), address);
-        
-        // lets send the packet into the transport so it can track packets
-        transport.doConsume(command, header);
+        final OpenWireFormat connectionWireFormat = 
serverTransport.getWireFormat().copy();
+        final UdpTransport transport = new UdpTransport(connectionWireFormat, 
address);
+
+        transport.receivedHeader(header);
+
+        return new WireFormatNegotiator(transport, transport.getWireFormat(), 
serverTransport.getMinmumWireFormatVersion()) {
 
-        return new WireFormatNegotiator(transport, 
serverTransport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {
+            public void start() throws Exception {
+                super.start();
+
+                // process the inbound wireformat
+                onCommand(command);
+            }
 
             // lets use the specific addressing of wire format
             protected void sendWireFormat(WireFormatInfo info) throws 
IOException {

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
 Thu Mar  9 12:26:47 2006
@@ -25,8 +25,9 @@
  */
 public interface DatagramReplayStrategy {
 
-    void onDroppedPackets(long expectedCounter, long actualCounter) throws 
IOException;
+    void onDroppedPackets(String name, long expectedCounter, long 
actualCounter) throws IOException;
 
-    void onReceivedPacket(long expectedCounter);
+    void onReceivedPacket(String name, long expectedCounter);
 
 }
+

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
 Thu Mar  9 12:26:47 2006
@@ -25,12 +25,12 @@
  */
 public class ExceptionIfDroppedPacketStrategy implements 
DatagramReplayStrategy {
 
-    public void onDroppedPackets(long expectedCounter, long actualCounter) 
throws IOException {
+    public void onDroppedPackets(String name, long expectedCounter, long 
actualCounter) throws IOException {
         long count = actualCounter - expectedCounter;
-        throw new IOException("" + count +  " packet(s) dropped. Expected: " + 
expectedCounter + " but was: " + actualCounter);
+        throw new IOException(name + count +  " packet(s) dropped. Expected: " 
+ expectedCounter + " but was: " + actualCounter);
     }
 
-    public void onReceivedPacket(long expectedCounter) {
+    public void onReceivedPacket(String name, long expectedCounter) {
     }
 
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
 Thu Mar  9 12:26:47 2006
@@ -157,7 +157,7 @@
     protected Command assertCommandReceived() throws InterruptedException {
         Command answer = null;
         synchronized (lock) {
-            lock.wait(5000);
+            lock.wait(1000);
             answer = receivedCommand;
         }
 

Modified: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
 Thu Mar  9 12:26:47 2006
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.transport.udp;
 
-import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
@@ -35,7 +34,8 @@
 
     protected Transport createProducer() throws Exception {
         System.out.println("Producer using URI: " + producerURI);
-        return TransportFactory.connect(new URI(producerURI));
+        URI uri = new URI(producerURI);
+        return TransportFactory.connect(uri);
     }
 
     protected TransportServer createServer() throws Exception {
@@ -45,12 +45,4 @@
     protected Transport createConsumer() throws Exception {
         return null;
     }
-
-    protected OpenWireFormat createWireFormat() {
-        OpenWireFormat answer = new OpenWireFormat();
-        answer.setCacheEnabled(false);
-        answer.setSizePrefixDisabled(true);
-        return answer;
-    }
-
 }


Reply via email to