Author: jstrachan
Date: Wed Mar 15 14:43:40 2006
New Revision: 386198
URL: http://svn.apache.org/viewcvs?rev=386198&view=rev
Log:
added a replay buffer to the ReliableTransport so that nodes with missing
messages can re-request stuff
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
Wed Mar 15 14:43:40 2006
@@ -18,10 +18,8 @@
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.udp.CommandChannel;
-import org.apache.activemq.transport.udp.CommandDatagramChannel;
import org.apache.activemq.transport.udp.CommandDatagramSocket;
import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
-import org.apache.activemq.transport.udp.DefaultBufferPool;
import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
@@ -36,7 +34,6 @@
import java.net.SocketException;
import java.net.URI;
import java.net.UnknownHostException;
-import java.nio.channels.DatagramChannel;
/**
* A multicast based transport.
@@ -75,7 +72,7 @@
super.doStop(stopper);
if (socket != null) {
try {
- socket.leaveGroup(mcastAddress);
+ socket.leaveGroup(getMulticastAddress());
}
catch (IOException e) {
stopper.onException(this, e);
@@ -89,11 +86,15 @@
socket.setLoopbackMode(loopBackMode);
socket.setTimeToLive(timeToLive);
- log.debug("Joining multicast address: " + mcastAddress);
- socket.joinGroup(mcastAddress);
+ log.debug("Joining multicast address: " + getMulticastAddress());
+ socket.joinGroup(getMulticastAddress());
socket.setSoTimeout((int) keepAliveInterval);
- return new CommandDatagramSocket(this, socket, getWireFormat(),
getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller());
+ return new CommandDatagramSocket( this, getWireFormat(),
getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(),
socket);
+ }
+
+ protected InetAddress getMulticastAddress() {
+ return mcastAddress;
}
protected InetSocketAddress createAddress(URI remoteLocation) throws
UnknownHostException, IOException {
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java?rev=386198&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
Wed Mar 15 14:43:40 2006
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class DefaultReplayBuffer implements ReplayBuffer {
+
+ private final int size;
+ private ReplayBufferListener listener;
+ private Map map;
+ private int lowestCommandId = 1;
+ private Object lock = new Object();
+
+ public DefaultReplayBuffer(int size) {
+ this.size = size;
+ map = createMap(size);
+ }
+
+ public void addBuffer(int commandId, Object buffer) {
+ synchronized (lock) {
+ int max = size - 1;
+ while (map.size() >= max) {
+ // lets find things to evict
+ Object evictedBuffer = map.remove(new
Integer(++lowestCommandId));
+ onEvictedBuffer(lowestCommandId, evictedBuffer);
+ }
+ map.put(new Integer(commandId), buffer);
+ }
+ }
+
+ public void setReplayBufferListener(ReplayBufferListener
bufferPoolAdapter) {
+ this.listener = bufferPoolAdapter;
+ }
+
+ public void replayMessages(int fromCommandId, int toCommandId, Replayer
replayer) throws IOException {
+ for (int i = fromCommandId; i <= toCommandId; i++) {
+ Object buffer = null;
+ synchronized (lock) {
+ buffer = map.get(new Integer(i));
+ }
+ replayer.sendBuffer(i, buffer);
+ }
+ }
+
+ protected Map createMap(int maximumSize) {
+ return new HashMap(maximumSize);
+ }
+
+ protected void onEvictedBuffer(int commandId, Object buffer) {
+ if (listener != null) {
+ listener.onBufferDiscarded(commandId, buffer);
+ }
+ }
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java?rev=386198&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
Wed Mar 15 14:43:40 2006
@@ -0,0 +1,69 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.io.IOException;
+
+/**
+ * Throws an exception if packets are dropped causing the transport to be
+ * closed.
+ *
+ * @version $Revision$
+ */
+public class DefaultReplayStrategy implements ReplayStrategy {
+
+ private int maximumDifference = 5;
+
+ public DefaultReplayStrategy() {
+ }
+
+ public DefaultReplayStrategy(int maximumDifference) {
+ this.maximumDifference = maximumDifference;
+ }
+
+ public boolean onDroppedPackets(ReliableTransport transport, int
expectedCounter, int actualCounter) throws IOException {
+ int difference = actualCounter - expectedCounter;
+ long count = Math.abs(difference);
+ if (count > maximumDifference) {
+ int upperLimit = actualCounter;
+ if (upperLimit < expectedCounter) {
+ upperLimit = expectedCounter;
+ }
+ transport.requestReplay(expectedCounter, upperLimit );
+ }
+
+ // lets discard old commands
+ return difference > 0;
+ }
+
+ public void onReceivedPacket(ReliableTransport transport, long
expectedCounter) {
+ // TODO we could pro-actively evict stuff from the buffer if we knew
there was only one client
+ }
+
+ public int getMaximumDifference() {
+ return maximumDifference;
+ }
+
+ /**
+ * Sets the maximum allowed difference between an expected packet and an
+ * actual packet before an error occurs
+ */
+ public void setMaximumDifference(int maximumDifference) {
+ this.maximumDifference = maximumDifference;
+ }
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
Wed Mar 15 14:43:40 2006
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.reliable;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ReplayCommand;
import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.CommandIdComparator;
import org.apache.activemq.transport.FutureResponse;
@@ -42,7 +43,10 @@
private ReplayStrategy replayStrategy;
private SortedSet commands = new TreeSet(new CommandIdComparator());
private int expectedCounter = 1;
+ private int replayBufferCommandCount = 50;
private int requestTimeout = 2000;
+ private ReplayBuffer replayBuffer;
+ private Replayer replayer;
public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
super(next);
@@ -54,6 +58,21 @@
this.replayStrategy = replayStrategy;
}
+ /**
+ * Requests that a range of commands be replayed
+ */
+ public void requestReplay(int fromCommandId, int toCommandId) {
+ ReplayCommand replay = new ReplayCommand();
+ replay.setFirstNakNumber(fromCommandId);
+ replay.setLastNakNumber(toCommandId);
+ try {
+ oneway(replay);
+ }
+ catch (IOException e) {
+ getTransportListener().onException(e);
+ }
+ }
+
public Response request(Command command) throws IOException {
FutureResponse response = asyncRequest(command);
@@ -62,7 +81,7 @@
if (result != null) {
return result;
}
- replayRequest(command, response);
+ onMissingResponse(command, response);
}
}
@@ -77,7 +96,7 @@
if (result != null) {
return result;
}
- replayRequest(command, response);
+ onMissingResponse(command, response);
timeout -= time;
}
return response.getResult(0);
@@ -89,6 +108,10 @@
super.onCommand(command);
return;
}
+ else if (command.getDataStructureType() ==
ReplayCommand.DATA_STRUCTURE_TYPE) {
+ replayCommands((ReplayCommand) command);
+ return;
+ }
int actualCounter = command.getCommandId();
boolean valid = expectedCounter == actualCounter;
@@ -107,7 +130,7 @@
}
}
catch (IOException e) {
- getTransportListener().onException(e);
+ onException(e);
}
if (!commands.isEmpty()) {
@@ -180,13 +203,61 @@
}
+ public ReplayBuffer getReplayBuffer() {
+ return replayBuffer;
+ }
+
+ public void setReplayBuffer(ReplayBuffer replayBuffer) {
+ this.replayBuffer = replayBuffer;
+ }
+
+ public int getReplayBufferCommandCount() {
+ return replayBufferCommandCount;
+ }
+
+ /**
+ * Sets the default number of commands which are buffered
+ */
+ public void setReplayBufferCommandCount(int replayBufferSize) {
+ this.replayBufferCommandCount = replayBufferSize;
+ }
+
public String toString() {
return next.toString();
}
+
+
+ public void start() throws Exception {
+ super.start();
+ if (replayBuffer == null) {
+ replayBuffer = createReplayBuffer();
+ }
+ }
+
/**
* Lets attempt to replay the request as a command may have disappeared
*/
- protected void replayRequest(Command command, FutureResponse response) {
- log.debug("Still waiting for response on: " + this + " to command: " +
command);
+ protected void onMissingResponse(Command command, FutureResponse response)
{
+ log.debug("Still waiting for response on: " + this + " to command: " +
command + " sending replay message");
+
+ int commandId = command.getCommandId();
+ requestReplay(commandId, commandId);
}
+
+ protected ReplayBuffer createReplayBuffer() {
+ return new DefaultReplayBuffer(getReplayBufferCommandCount());
+ }
+
+ protected void replayCommands(ReplayCommand command) {
+ try {
+ replayBuffer.replayMessages(command.getFirstNakNumber(),
command.getLastNakNumber(), replayer);
+
+ // TODO we could proactively remove ack'd stuff from the replay
buffer
+ // if we only have a single client talking to us
+ }
+ catch (IOException e) {
+ onException(e);
+ }
+ }
+
}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java?rev=386198&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
Wed Mar 15 14:43:40 2006
@@ -0,0 +1,39 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.io.IOException;
+
+/**
+ * This class keeps around a buffer of old commands which have been sent on
+ * an unreliable transport. The buffers are of type Object as they could be
datagrams
+ * or byte[] or ByteBuffer - depending on the underlying transport
implementation.
+ *
+ * @version $Revision$
+ */
+public interface ReplayBuffer {
+
+ /**
+ * Submit a buffer for caching around for a period of time, during which
time it can be replayed
+ * to users interested in it.
+ */
+ public void addBuffer(int commandId, Object buffer);
+
+ public void setReplayBufferListener(ReplayBufferListener
bufferPoolAdapter);
+
+ public void replayMessages(int fromCommandId, int toCommandId, Replayer
replayer) throws IOException;
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java?rev=386198&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
Wed Mar 15 14:43:40 2006
@@ -0,0 +1,31 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+/**
+ * Listens to events on a [EMAIL PROTECTED] ReplayBuffer}
+ *
+ * @version $Revision$
+ */
+public interface ReplayBufferListener {
+
+ /**
+ * Indications that the buffer has been discarded and so could be
+ * re-introduced into some pool
+ */
+ public void onBufferDiscarded(int commandId, Object buffer);
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java?rev=386198&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
Wed Mar 15 14:43:40 2006
@@ -0,0 +1,37 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.io.IOException;
+
+/**
+ * Used by a [EMAIL PROTECTED] ReplayBuffer} to replay buffers back over an
unreliable transport
+ *
+ * @version $Revision$
+ */
+public interface Replayer {
+
+ /**
+ * Sends the given buffer back to the transport
+ * if the buffer could be found - otherwise maybe send some kind
+ * of exception
+ *
+ * @param commandId the command ID
+ * @param buffer the buffer to be sent - or null if the buffer no longer
exists in the buffer
+ */
+ void sendBuffer(int commandId, Object buffer) throws IOException;
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
Wed Mar 15 14:43:40 2006
@@ -18,6 +18,7 @@
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.reliable.Replayer;
import java.io.IOException;
import java.net.SocketAddress;
@@ -26,7 +27,7 @@
*
* @version $Revision$
*/
-public interface CommandChannel extends Service {
+public interface CommandChannel extends Replayer, Service {
public abstract Command read() throws IOException;
@@ -45,4 +46,5 @@
public abstract void setTargetAddress(SocketAddress address);
+ public abstract void setReplayAddress(SocketAddress address);
}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java?rev=386198&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
Wed Mar 15 14:43:40 2006
@@ -0,0 +1,101 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.IntSequenceGenerator;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+/**
+ *
+ * @version $Revision$
+ */
+public abstract class CommandChannelSupport implements CommandChannel {
+
+ protected OpenWireFormat wireFormat;
+ protected int datagramSize = 4 * 1024;
+ protected SocketAddress targetAddress;
+ protected SocketAddress replayAddress;
+ protected final String name;
+ protected final IntSequenceGenerator sequenceGenerator;
+ protected DatagramHeaderMarshaller headerMarshaller;
+
+ public CommandChannelSupport(UdpTransport transport, OpenWireFormat
wireFormat, int datagramSize, SocketAddress targetAddress,
+ DatagramHeaderMarshaller headerMarshaller) {
+ this.wireFormat = wireFormat;
+ this.datagramSize = datagramSize;
+ this.targetAddress = targetAddress;
+ this.headerMarshaller = headerMarshaller;
+ this.name = transport.toString();
+ this.sequenceGenerator = transport.getSequenceGenerator();
+ this.replayAddress = targetAddress;
+ if (sequenceGenerator == null) {
+ throw new IllegalArgumentException("No sequenceGenerator on the
given transport: " + transport);
+ }
+ }
+
+ public void write(Command command) throws IOException {
+ write(command, targetAddress);
+ }
+
+
+ // Properties
+ //
-------------------------------------------------------------------------
+
+ public int getDatagramSize() {
+ return datagramSize;
+ }
+
+ /**
+ * Sets the default size of a datagram on the network.
+ */
+ public void setDatagramSize(int datagramSize) {
+ this.datagramSize = datagramSize;
+ }
+
+ public SocketAddress getTargetAddress() {
+ return targetAddress;
+ }
+
+ public void setTargetAddress(SocketAddress targetAddress) {
+ this.targetAddress = targetAddress;
+ }
+
+ public SocketAddress getReplayAddress() {
+ return replayAddress;
+ }
+
+ public void setReplayAddress(SocketAddress replayAddress) {
+ this.replayAddress = replayAddress;
+ }
+
+ public String toString() {
+ return "CommandChannel#" + name;
+ }
+
+ public DatagramHeaderMarshaller getHeaderMarshaller() {
+ return headerMarshaller;
+ }
+
+ public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller)
{
+ this.headerMarshaller = headerMarshaller;
+ }
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
Wed Mar 15 14:43:40 2006
@@ -24,7 +24,6 @@
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,54 +39,30 @@
*
* @version $Revision$
*/
-public class CommandDatagramChannel implements CommandChannel {
+public class CommandDatagramChannel extends CommandChannelSupport {
private static final Log log =
LogFactory.getLog(CommandDatagramChannel.class);
- private final String name;
- private final IntSequenceGenerator sequenceGenerator;
private DatagramChannel channel;
- private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
- private int datagramSize = 4 * 1024;
- private SocketAddress targetAddress;
- private DatagramHeaderMarshaller headerMarshaller;
-
// reading
private Object readLock = new Object();
private ByteBuffer readBuffer;
// writing
private Object writeLock = new Object();
- private ByteBuffer writeBuffer;
private int defaultMarshalBufferSize = 64 * 1024;
-
-
- public CommandDatagramChannel(UdpTransport transport, DatagramChannel
channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
- SocketAddress targetAddress, DatagramHeaderMarshaller
headerMarshaller) {
+ public CommandDatagramChannel(UdpTransport transport, OpenWireFormat
wireFormat, int datagramSize, SocketAddress targetAddress,
DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel,
ByteBufferPool bufferPool) {
+ super(transport, wireFormat, datagramSize, targetAddress,
headerMarshaller);
this.channel = channel;
- this.wireFormat = wireFormat;
this.bufferPool = bufferPool;
- this.datagramSize = datagramSize;
- this.targetAddress = targetAddress;
- this.headerMarshaller = headerMarshaller;
- this.name = transport.toString();
- this.sequenceGenerator = transport.getSequenceGenerator();
- if (sequenceGenerator == null) {
- throw new IllegalArgumentException("No sequenceGenerator on the
given transport: " + transport);
- }
- }
-
- public String toString() {
- return "CommandChannel#" + name;
}
public void start() throws Exception {
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
- writeBuffer = bufferPool.borrowBuffer();
}
public void stop() throws Exception {
@@ -132,6 +107,7 @@
return answer;
}
+
public void write(Command command, SocketAddress address) throws
IOException {
synchronized (writeLock) {
@@ -140,6 +116,7 @@
byte[] data = largeBuffer.toByteArray();
int size = data.length;
+ ByteBuffer writeBuffer = bufferPool.borrowBuffer();
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
@@ -215,7 +192,7 @@
writeBuffer.put(data, offset, chunkSize);
offset += chunkSize;
- sendWriteBuffer(address, commandId);
+ sendWriteBuffer(address, writeBuffer, commandId);
}
// now lets write the last partial command
@@ -231,21 +208,13 @@
}
writeBuffer.put(data);
- sendWriteBuffer(address, command.getCommandId());
+ sendWriteBuffer(address, writeBuffer, command.getCommandId());
}
}
// Properties
//
-------------------------------------------------------------------------
- public int getDatagramSize() {
- return datagramSize;
- }
-
- public void setDatagramSize(int datagramSize) {
- this.datagramSize = datagramSize;
- }
-
public ByteBufferPool getBufferPool() {
return bufferPool;
}
@@ -257,32 +226,23 @@
this.bufferPool = bufferPool;
}
- public DatagramHeaderMarshaller getHeaderMarshaller() {
- return headerMarshaller;
- }
-
- public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller)
{
- this.headerMarshaller = headerMarshaller;
- }
-
-
- public SocketAddress getTargetAddress() {
- return targetAddress;
- }
-
- public void setTargetAddress(SocketAddress targetAddress) {
- this.targetAddress = targetAddress;
- }
-
// Implementation methods
//
-------------------------------------------------------------------------
- protected void sendWriteBuffer(SocketAddress address, int commandId)
throws IOException {
+ protected void sendWriteBuffer(SocketAddress address, ByteBuffer
writeBuffer, int commandId) throws IOException {
writeBuffer.flip();
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram: " + commandId +
" to: " + address);
}
channel.send(writeBuffer, address);
+
+ // now lets put the buffer back into the replay buffer
}
+ public void sendBuffer(int commandId, Object buffer) throws IOException {
+ ByteBuffer writeBuffer = (ByteBuffer) buffer;
+ sendWriteBuffer(getReplayAddress(), writeBuffer, commandId);
+ }
+
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
Wed Mar 15 14:43:40 2006
@@ -24,7 +24,6 @@
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,8 +32,6 @@
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
@@ -42,38 +39,18 @@
*
* @version $Revision$
*/
-public class CommandDatagramSocket implements CommandChannel {
+public class CommandDatagramSocket extends CommandChannelSupport {
private static final Log log =
LogFactory.getLog(CommandDatagramSocket.class);
- private final String name;
private DatagramSocket channel;
- private InetAddress targetAddress;
- private int targetPort;
- private OpenWireFormat wireFormat;
- private int datagramSize = 4 * 1024;
- private DatagramHeaderMarshaller headerMarshaller;
- private IntSequenceGenerator sequenceGenerator;
private Object readLock = new Object();
private Object writeLock = new Object();
- public CommandDatagramSocket(UdpTransport transport, DatagramSocket
channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress,
- int targetPort, DatagramHeaderMarshaller headerMarshaller) {
+ public CommandDatagramSocket(UdpTransport transport, OpenWireFormat
wireFormat, int datagramSize, SocketAddress targetAddress,
+ DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel)
{
+ super(transport, wireFormat, datagramSize, targetAddress,
headerMarshaller);
this.channel = channel;
- this.wireFormat = wireFormat;
- this.datagramSize = datagramSize;
- this.targetAddress = targetAddress;
- this.targetPort = targetPort;
- this.headerMarshaller = headerMarshaller;
- this.name = transport.toString();
- this.sequenceGenerator = transport.getSequenceGenerator();
- if (sequenceGenerator == null) {
- throw new IllegalArgumentException("No sequenceGenerator on the
given transport: " + transport);
- }
- }
-
- public String toString() {
- return "CommandChannel#" + name;
}
public void start() throws Exception {
@@ -110,11 +87,6 @@
}
public void write(Command command, SocketAddress address) throws
IOException {
- InetSocketAddress ia = (InetSocketAddress) address;
- write(command, ia.getAddress(), ia.getPort());
- }
-
- public void write(Command command, InetAddress address, int port) throws
IOException {
synchronized (writeLock) {
ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
@@ -126,7 +98,7 @@
wireFormat.marshal(command, dataOut);
if (remaining(writeBuffer) >= 0) {
- sendWriteBuffer(address, port, writeBuffer,
command.getCommandId());
+ sendWriteBuffer(address, writeBuffer, command.getCommandId());
}
else {
// lets split the command up into chunks
@@ -197,7 +169,7 @@
dataOut.write(data, offset, chunkSize);
offset += chunkSize;
- sendWriteBuffer(address, port, writeBuffer, commandId);
+ sendWriteBuffer(address, writeBuffer, commandId);
}
// now lets write the last partial command
@@ -208,58 +180,37 @@
headerMarshaller.writeHeader(command, dataOut);
wireFormat.marshal(command, dataOut);
- sendWriteBuffer(address, port, writeBuffer,
command.getCommandId());
+ sendWriteBuffer(address, writeBuffer, command.getCommandId());
}
}
}
- // Properties
- //
-------------------------------------------------------------------------
-
public int getDatagramSize() {
return datagramSize;
}
- /**
- * Sets the default size of a datagram on the network.
- */
public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize;
}
- public DatagramHeaderMarshaller getHeaderMarshaller() {
- return headerMarshaller;
- }
-
- public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller)
{
- this.headerMarshaller = headerMarshaller;
- }
-
-
- public SocketAddress getTargetAddress() {
- return new InetSocketAddress(targetAddress, targetPort);
- }
-
- public void setTargetAddress(SocketAddress address) {
- if (address instanceof InetSocketAddress) {
- InetSocketAddress ia = (InetSocketAddress) address;
- targetAddress = ia.getAddress();
- targetPort = ia.getPort();
- }
- else {
- throw new IllegalArgumentException("Address must be instance of
InetSocketAddress");
- }
- }
-
// Implementation methods
//
-------------------------------------------------------------------------
- protected void sendWriteBuffer(InetAddress address, int port,
ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
+ protected void sendWriteBuffer(SocketAddress address,
ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
+ byte[] data = writeBuffer.toByteArray();
+ sendWriteBuffer(address, commandId, data);
+ }
+
+ protected void sendWriteBuffer(SocketAddress address, int commandId,
byte[] data) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram: " + commandId +
" to: " + address);
}
- byte[] data = writeBuffer.toByteArray();
- DatagramPacket packet = new DatagramPacket(data, 0, data.length,
address, port);
+ DatagramPacket packet = new DatagramPacket(data, 0, data.length,
address);
channel.send(packet);
+ }
+
+ public void sendBuffer(int commandId, Object buffer) throws IOException {
+ byte[] data = (byte[]) buffer;
+ sendWriteBuffer(replayAddress, commandId, data);
}
protected DatagramPacket createDatagramPacket() {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
Wed Mar 15 14:43:40 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReplayStrategy;
+import org.apache.activemq.transport.reliable.Replayer;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
@@ -66,6 +67,7 @@
private String description = null;
private Runnable runnable;
private IntSequenceGenerator sequenceGenerator;
+ private boolean replayEnabled = true;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat;
@@ -93,6 +95,18 @@
this.description = getProtocolName() + "Server@";
}
+
+ /**
+ * Creates a replayer for working with the reliable transport
+ * @return
+ */
+ public Replayer createReplayer() {
+ if (replayEnabled ) {
+ return commandChannel;
+ }
+ return null;
+ }
+
/**
* A one way asynchronous send
*/
@@ -311,6 +325,19 @@
this.sequenceGenerator = sequenceGenerator;
}
+ public boolean isReplayEnabled() {
+ return replayEnabled;
+ }
+
+ /**
+ * Sets whether or not replay should be enabled when using the reliable
transport.
+ * i.e. should we maintain a buffer of messages that can be replayed?
+ */
+ public void setReplayEnabled(boolean replayEnabled) {
+ this.replayEnabled = replayEnabled;
+ }
+
+
// Implementation methods
//
-------------------------------------------------------------------------
@@ -354,7 +381,7 @@
if (bufferPool == null) {
bufferPool = new DefaultBufferPool();
}
- return new CommandDatagramChannel(this, channel, wireFormat,
bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
+ return new CommandDatagramChannel(this, wireFormat, datagramSize,
targetAddress, createDatagramHeaderMarshaller(), channel, bufferPool);
}
protected void bind(DatagramSocket socket, SocketAddress localAddress)
throws IOException {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
Wed Mar 15 14:43:40 2006
@@ -27,9 +27,11 @@
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy;
+import org.apache.activemq.transport.reliable.Replayer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.activemq.util.IntrospectionSupport;
@@ -56,15 +58,7 @@
UdpTransport transport = new UdpTransport(openWireFormat, port);
Transport configuredTransport = configure(transport, wf, options,
true);
- ReplayStrategy replayStrategy = null;
- if (configuredTransport instanceof ReliableTransport) {
- ReliableTransport rt = (ReliableTransport) configuredTransport;
- replayStrategy = rt.getReplayStrategy();
- }
- if (replayStrategy == null) {
- replayStrategy = createReplayStrategy();
- }
- UdpTransportServer server = new UdpTransportServer(location,
transport, configuredTransport, replayStrategy);
+ UdpTransportServer server = new UdpTransportServer(location,
transport, configuredTransport, createReplayStrategy());
return server;
}
catch (URISyntaxException e) {
@@ -106,16 +100,25 @@
return new UdpTransport(wireFormat, location);
}
- protected Transport configure(Transport transport, WireFormat format, Map
options, boolean server) {
+ /**
+ * Configures the transport
+ *
+ * @param acceptServer
+ * true if this transport is used purely as an 'accept'
transport
+ * for new connections which work like TCP SocketServers where
+ * new connections spin up a new separate UDP transport
+ */
+ protected Transport configure(Transport transport, WireFormat format, Map
options, boolean acceptServer) {
IntrospectionSupport.setProperties(transport, options);
UdpTransport udpTransport = (UdpTransport) transport;
+
OpenWireFormat openWireFormat = asOpenWireFormat(format);
if (udpTransport.isTrace()) {
transport = new TransportLogger(transport);
}
- if (!server && format instanceof OpenWireFormat) {
+ if (!acceptServer && format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format,
udpTransport);
}
@@ -125,7 +128,11 @@
// deal with fragmentation
- if (server) {
+ if (acceptServer) {
+ // lets not support a buffer of messages to enable reliable
+ // messaging on the 'accept server' transport
+ udpTransport.setReplayEnabled(true);
+
// we don't want to do reliable checks on this transport as we
// delegate to one that does
transport = new CommandJoiner(transport, openWireFormat);
@@ -133,7 +140,8 @@
return transport;
}
else {
- ReliableTransport reliableTransport = new
ReliableTransport(transport, createReplayStrategy());
+ Replayer replayer = udpTransport.createReplayer();
+ ReliableTransport reliableTransport = new
ReliableTransport(transport, createReplayStrategy(replayer));
udpTransport.setSequenceGenerator(reliableTransport.getSequenceGenerator());
// Joiner must be on outside as the inbound messages must be
@@ -142,8 +150,15 @@
}
}
- protected ReplayStrategy createReplayStrategy() {
+ protected ReplayStrategy createReplayStrategy(Replayer replayer) {
+ if (replayer != null) {
+ return new DefaultReplayStrategy(5);
+ }
return new ExceptionIfDroppedReplayStrategy(1);
+ }
+
+ protected ReplayStrategy createReplayStrategy() {
+ return new DefaultReplayStrategy(5);
}
protected Transport configureClientSideNegotiator(Transport transport,
WireFormat format, final UdpTransport udpTransport) {
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
Wed Mar 15 14:43:40 2006
@@ -25,14 +25,12 @@
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import javax.jms.MessageNotWriteableException;
import java.io.IOException;
-import java.net.URI;
import junit.framework.TestCase;