Author: jstrachan
Date: Thu Mar 9 12:26:47 2006
New Revision: 384603
URL: http://svn.apache.org/viewcvs?rev=384603&view=rev
Log:
added a working UDP server with test cases
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/project.xml
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
Modified: incubator/activemq/trunk/activemq-core/project.xml
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Thu Mar 9 12:26:47 2006
@@ -360,10 +360,6 @@
<exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
-
- <!-- TODO FIXME -->
- <exclude>**/UdpTransportUsingServerTest.*</exclude>
- <exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude>
</excludes>
</unitTest>
<resources>
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
Thu Mar 9 12:26:47 2006
@@ -81,11 +81,13 @@
public OpenWireFormat copy() {
OpenWireFormat answer = new OpenWireFormat();
+ answer.version = version;
answer.stackTraceEnabled = stackTraceEnabled;
answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
answer.cacheEnabled = cacheEnabled;
answer.tightEncodingEnabled = tightEncodingEnabled;
answer.sizePrefixDisabled = sizePrefixDisabled;
+ answer.preferedWireFormatInfo = preferedWireFormatInfo;
return answer;
}
@@ -104,8 +106,8 @@
static IdGenerator g = new IdGenerator();
String id = g.generateId();
public String toString() {
- //return "OpenWireFormat{version="+version+",
cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+",
tightEncodingEnabled="+tightEncodingEnabled+",
sizePrefixDisabled="+sizePrefixDisabled+"}";
- return "OpenWireFormat{id="+id+",
tightEncodingEnabled="+tightEncodingEnabled+"}";
+ return "OpenWireFormat{version="+version+",
cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+",
tightEncodingEnabled="+tightEncodingEnabled+",
sizePrefixDisabled="+sizePrefixDisabled+"}";
+ //return "OpenWireFormat{id="+id+",
tightEncodingEnabled="+tightEncodingEnabled+"}";
}
public int getVersion() {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
Thu Mar 9 12:26:47 2006
@@ -94,7 +94,14 @@
onException(new IOException("Remote wire format
("+info.getVersion()+") is lower the minimum version required
("+minimumVersion+")"));
}
+ if (log.isDebugEnabled()) {
+ log.debug(this + " before negotiation: " + wireFormat);
+ }
wireFormat.renegociatWireFormat(info);
+
+ if (log.isDebugEnabled()) {
+ log.debug(this + " after negotiation: " + wireFormat);
+ }
} catch (IOException e) {
onException(e);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
Thu Mar 9 12:26:47 2006
@@ -43,6 +43,7 @@
private static final Log log = LogFactory.getLog(CommandChannel.class);
+ private final String name;
private DatagramChannel channel;
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
@@ -50,11 +51,12 @@
private DatagramReplayStrategy replayStrategy;
private SocketAddress targetAddress;
private DatagramHeaderMarshaller headerMarshaller = new
DatagramHeaderMarshaller();
+ private final boolean checkSequenceNumbers;
// reading
private Object readLock = new Object();
private ByteBuffer readBuffer;
- private CommandReadBuffer readStack;
+ private DatagramReadBuffer readStack;
private SocketAddress lastReadDatagramAddress;
// writing
@@ -64,14 +66,20 @@
private int largeMessageBufferSize = 128 * 1024;
private DatagramHeader header = new DatagramHeader();
- public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat,
ByteBufferPool bufferPool, int datagramSize,
- DatagramReplayStrategy replayStrategy, SocketAddress
targetAddress) {
+ public CommandChannel(String name, DatagramChannel channel, OpenWireFormat
wireFormat, ByteBufferPool bufferPool, int datagramSize,
+ DatagramReplayStrategy replayStrategy, SocketAddress
targetAddress, boolean checkSequenceNumbers) {
+ this.name = name;
this.channel = channel;
this.wireFormat = wireFormat;
this.bufferPool = bufferPool;
this.datagramSize = datagramSize;
this.replayStrategy = replayStrategy;
this.targetAddress = targetAddress;
+ this.checkSequenceNumbers = checkSequenceNumbers;
+ }
+
+ public String toString() {
+ return "CommandChannel#" + name;
}
public void start() throws Exception {
@@ -79,7 +87,9 @@
wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true);
- readStack = new CommandReadBuffer(wireFormat, replayStrategy);
+ if (checkSequenceNumbers) {
+ readStack = new CommandReadBuffer(name, wireFormat,
replayStrategy);
+ }
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
@@ -98,26 +108,21 @@
readBuffer.clear();
lastReadDatagramAddress = channel.receive(readBuffer);
readBuffer.flip();
-
- if (log.isDebugEnabled()) {
- log.debug("Read a datagram from: " + lastReadDatagramAddress);
- }
+
header = headerMarshaller.readHeader(readBuffer);
header.setFromAddress(lastReadDatagramAddress);
if (log.isDebugEnabled()) {
- log.debug("Received datagram from: " + lastReadDatagramAddress
+ " header: " + header);
+ log.debug("Received datagram on: " + name + " from: " +
lastReadDatagramAddress + " header: " + header);
}
int remaining = readBuffer.remaining();
int size = header.getDataSize();
/*
- if (size > remaining) {
- throw new IOException("Invalid command size: " + size + " when
there are only: " + remaining + " byte(s) remaining");
- }
- else if (size < remaining) {
- log.warn("Extra bytes in buffer. Expecting: " + size + " but
has: " + remaining);
- }
- */
+ * if (size > remaining) { throw new IOException("Invalid command
+ * size: " + size + " when there are only: " + remaining + "
byte(s)
+ * remaining"); } else if (size < remaining) { log.warn("Extra
bytes
+ * in buffer. Expecting: " + size + " but has: " + remaining); }
+ */
if (size != remaining) {
log.warn("Expecting: " + size + " but has: " + remaining);
}
@@ -133,23 +138,39 @@
// TODO use a DataInput implementation that talks direct to the
// ByteBuffer
DataInputStream dataIn = new DataInputStream(new
ByteArrayInputStream(data));
- Command command = (Command) wireFormat.doUnmarshal(dataIn);
+ Command command = (Command) wireFormat.unmarshal(dataIn);
+ // Command command = (Command) wireFormat.doUnmarshal(dataIn);
header.setCommand(command);
}
- answer = readStack.read(header);
+ if (readStack != null) {
+ answer = readStack.read(header);
+ }
+ else {
+ answer = header.getCommand();
+ }
}
if (answer != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Channel: " + name + " about to process: " + answer);
+ }
processor.process(answer, header);
}
}
/**
- * Called if a packet is received on a different channel from a remote
client
- * @throws IOException
+ * Called if a packet is received on a different channel from a remote
+ * client
+ *
+ * @throws IOException
*/
public Command onDatagramReceived(DatagramHeader header) throws
IOException {
- return readStack.read(header);
+ if (readStack != null) {
+ return readStack.read(header);
+ }
+ else {
+ return header.getCommand();
+ }
}
public void write(Command command) throws IOException {
@@ -159,10 +180,12 @@
public void write(Command command, SocketAddress address) throws
IOException {
synchronized (writeLock) {
header.incrementCounter();
- bs = new BooleanStream();
- // TODO
- //bs.clear();
- int size = wireFormat.tightMarshal1(command, bs);
+
+ ByteArrayOutputStream largeBuffer = new
ByteArrayOutputStream(largeMessageBufferSize);
+ wireFormat.marshal(command, new DataOutputStream(largeBuffer));
+ byte[] data = largeBuffer.toByteArray();
+ int size = data.length;
+
if (size < datagramSize) {
header.setPartial(false);
header.setComplete(true);
@@ -170,13 +193,6 @@
writeBuffer.clear();
headerMarshaller.writeHeader(header, writeBuffer);
- // TODO use a DataOutput implementation that talks direct to
the
- // ByteBuffer
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- DataOutputStream dataOut = new DataOutputStream(buffer);
- wireFormat.tightMarshal2(command, dataOut, bs);
- dataOut.close();
- byte[] data = buffer.toByteArray();
writeBuffer.put(data);
sendWriteBuffer(address);
@@ -186,10 +202,7 @@
header.setComplete(false);
// lets split the command up into chunks
- ByteArrayOutputStream largeBuffer = new
ByteArrayOutputStream(largeMessageBufferSize);
- wireFormat.marshal(command, new DataOutputStream(largeBuffer));
- byte[] data = largeBuffer.toByteArray();
int offset = 0;
boolean lastFragment = false;
for (int fragment = 0, length = data.length; !lastFragment;
fragment++) {
@@ -248,15 +261,14 @@
return lastReadDatagramAddress;
}
}
-
// Implementation methods
//
-------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address) throws IOException {
writeBuffer.flip();
-
+
if (log.isDebugEnabled()) {
- log.debug("Sending datagram to: " + address + " header: " +
header);
+ log.debug("Channel: " + name + " sending datagram to: " + address
+ " header: " + header);
}
channel.send(writeBuffer, address);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
Thu Mar 9 12:26:47 2006
@@ -35,7 +35,7 @@
*
* @version $Revision$
*/
-public class CommandReadBuffer {
+public class CommandReadBuffer implements DatagramReadBuffer {
private static final Log log = LogFactory.getLog(CommandReadBuffer.class);
private OpenWireFormat wireFormat;
@@ -43,8 +43,10 @@
private SortedSet headers = new TreeSet();
private long expectedCounter = 1;
private ByteArrayOutputStream out = new ByteArrayOutputStream();
+ private final String name;
- public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy
replayStrategy) {
+ public CommandReadBuffer(String name, OpenWireFormat wireFormat,
DatagramReplayStrategy replayStrategy) {
+ this.name = name;
this.wireFormat = wireFormat;
this.replayStrategy = replayStrategy;
}
@@ -57,13 +59,16 @@
log.warn("Ignoring out of step packet: " + header);
}
else {
- replayStrategy.onDroppedPackets(expectedCounter,
actualCounter);
+ replayStrategy.onDroppedPackets(name, expectedCounter,
actualCounter);
// lets add it to the list for later on
headers.add(header);
}
// lets see if the first item in the set is the next header
+ if (headers.isEmpty()) {
+ return null;
+ }
header = (DatagramHeader) headers.first();
if (expectedCounter != header.getCounter()) {
return null;
@@ -71,7 +76,7 @@
}
// we've got a valid header so increment counter
- replayStrategy.onReceivedPacket(expectedCounter);
+ replayStrategy.onReceivedPacket(name, expectedCounter);
expectedCounter++;
Command answer = null;
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java?rev=384603&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
Thu Mar 9 12:26:47 2006
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+
+import java.io.IOException;
+
+/**
+ * Represents an inbound buffer of datagrams for dealing with out of order
+ * or fragmented commands.
+ *
+ * @version $Revision$
+ */
+public interface DatagramReadBuffer {
+
+ Command read(DatagramHeader header) throws IOException;
+
+}
\ No newline at end of file
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
Thu Mar 9 12:26:47 2006
@@ -56,6 +56,7 @@
private DatagramChannel channel;
private boolean trace = false;
private boolean useLocalHost = true;
+ private boolean checkSequenceNumbers = true;
private int port;
private int minmumWireFormatVersion;
private String description = null;
@@ -112,7 +113,8 @@
commandChannel.write(command, address);
}
- public void doConsume(Command command, DatagramHeader header) throws
IOException {
+
+ public void receivedHeader(DatagramHeader header) {
wireFormatHeader = header;
}
@@ -253,6 +255,14 @@
public OpenWireFormat getWireFormat() {
return wireFormat;
}
+
+ public boolean isCheckSequenceNumbers() {
+ return checkSequenceNumbers;
+ }
+
+ public void setCheckSequenceNumbers(boolean checkSequenceNumbers) {
+ this.checkSequenceNumbers = checkSequenceNumbers;
+ }
// Implementation methods
//
-------------------------------------------------------------------------
@@ -303,7 +313,7 @@
if (bufferPool == null) {
bufferPool = new DefaultBufferPool();
}
- commandChannel = new CommandChannel(channel, wireFormat, bufferPool,
datagramSize, replayStrategy, targetAddress);
+ commandChannel = new CommandChannel(toString(), channel, wireFormat,
bufferPool, datagramSize, replayStrategy, targetAddress,
isCheckSequenceNumbers());
commandChannel.start();
// lets pass the header & address into the channel so it avoids a
@@ -336,5 +346,6 @@
targetAddress = lastAddress;
}
}
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
Thu Mar 9 12:26:47 2006
@@ -85,7 +85,6 @@
protected Transport createTransport(URI location, WireFormat wf) throws
UnknownHostException, IOException {
OpenWireFormat wireFormat = asOpenWireFormat(wf);
- wireFormat.setSizePrefixDisabled(true);
return new UdpTransport(wireFormat, location);
}
@@ -113,7 +112,7 @@
transport = new WireFormatNegotiator(transport,
asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
protected void onWireFormatNegotiated(WireFormatInfo info) {
// lets switch to the targetAddress that the last packet was
- // received as
+ // received as so that all future requests go to the newly
created UDP channel
udpTransport.useLastInboundDatagramAsNewTarget();
}
};
@@ -122,8 +121,6 @@
protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
OpenWireFormat answer = (OpenWireFormat) wf;
- answer.setSizePrefixDisabled(true);
- answer.setCacheEnabled(false);
return answer;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
Thu Mar 9 12:26:47 2006
@@ -24,7 +24,6 @@
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerSupport;
import org.apache.activemq.transport.WireFormatNegotiator;
@@ -55,6 +54,10 @@
super(connectURI);
this.serverTransport = serverTransport;
this.configuredTransport = configuredTransport;
+
+ // lets disable the incremental checking of the sequence numbers
+ // as we are getting messages from many different clients
+ serverTransport.setCheckSequenceNumbers(false);
}
public String toString() {
@@ -96,12 +99,16 @@
public void process(Command command, DatagramHeader header) throws
IOException {
SocketAddress address = header.getFromAddress();
- System.out.println(toString() + " received command: " + command + "
from address: " + address);
+ if (log.isDebugEnabled()) {
+ log.debug("Received command on: " + this + " from address: " +
address + " command: " + command);
+ }
Transport transport = null;
synchronized (transports) {
transport = (Transport) transports.get(address);
if (transport == null) {
- System.out.println("###Êcreating new server connector");
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new UDP server connection");
+ }
transport = createTransport(command, header);
transport = configureTransport(transport);
transports.put(address, transport);
@@ -114,23 +121,30 @@
protected Transport configureTransport(Transport transport) {
transport = new ResponseCorrelator(transport);
-
- // TODO
- //transport = new InactivityMonitor(transport,
serverTransport.getMaxInactivityDuration());
+ if (serverTransport.getMaxInactivityDuration() > 0) {
+ transport = new InactivityMonitor(transport,
serverTransport.getMaxInactivityDuration());
+ }
+
getAcceptListener().onAccept(transport);
return transport;
}
- protected Transport createTransport(Command command, DatagramHeader
header) throws IOException {
+ protected Transport createTransport(final Command command, DatagramHeader
header) throws IOException {
final SocketAddress address = header.getFromAddress();
- // TODO lets copy the wireformat...
- final UdpTransport transport = new
UdpTransport(serverTransport.getWireFormat(), address);
-
- // lets send the packet into the transport so it can track packets
- transport.doConsume(command, header);
+ final OpenWireFormat connectionWireFormat =
serverTransport.getWireFormat().copy();
+ final UdpTransport transport = new UdpTransport(connectionWireFormat,
address);
+
+ transport.receivedHeader(header);
+
+ return new WireFormatNegotiator(transport, transport.getWireFormat(),
serverTransport.getMinmumWireFormatVersion()) {
- return new WireFormatNegotiator(transport,
serverTransport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {
+ public void start() throws Exception {
+ super.start();
+
+ // process the inbound wireformat
+ onCommand(command);
+ }
// lets use the specific addressing of wire format
protected void sendWireFormat(WireFormatInfo info) throws
IOException {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
Thu Mar 9 12:26:47 2006
@@ -25,8 +25,9 @@
*/
public interface DatagramReplayStrategy {
- void onDroppedPackets(long expectedCounter, long actualCounter) throws
IOException;
+ void onDroppedPackets(String name, long expectedCounter, long
actualCounter) throws IOException;
- void onReceivedPacket(long expectedCounter);
+ void onReceivedPacket(String name, long expectedCounter);
}
+
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
Thu Mar 9 12:26:47 2006
@@ -25,12 +25,12 @@
*/
public class ExceptionIfDroppedPacketStrategy implements
DatagramReplayStrategy {
- public void onDroppedPackets(long expectedCounter, long actualCounter)
throws IOException {
+ public void onDroppedPackets(String name, long expectedCounter, long
actualCounter) throws IOException {
long count = actualCounter - expectedCounter;
- throw new IOException("" + count + " packet(s) dropped. Expected: " +
expectedCounter + " but was: " + actualCounter);
+ throw new IOException(name + count + " packet(s) dropped. Expected: "
+ expectedCounter + " but was: " + actualCounter);
}
- public void onReceivedPacket(long expectedCounter) {
+ public void onReceivedPacket(String name, long expectedCounter) {
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
Thu Mar 9 12:26:47 2006
@@ -157,7 +157,7 @@
protected Command assertCommandReceived() throws InterruptedException {
Command answer = null;
synchronized (lock) {
- lock.wait(5000);
+ lock.wait(1000);
answer = receivedCommand;
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java?rev=384603&r1=384602&r2=384603&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
Thu Mar 9 12:26:47 2006
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.transport.udp;
-import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
@@ -35,7 +34,8 @@
protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + producerURI);
- return TransportFactory.connect(new URI(producerURI));
+ URI uri = new URI(producerURI);
+ return TransportFactory.connect(uri);
}
protected TransportServer createServer() throws Exception {
@@ -45,12 +45,4 @@
protected Transport createConsumer() throws Exception {
return null;
}
-
- protected OpenWireFormat createWireFormat() {
- OpenWireFormat answer = new OpenWireFormat();
- answer.setCacheEnabled(false);
- answer.setSizePrefixDisabled(true);
- return answer;
- }
-
}