Author: jstrachan
Date: Fri Mar 10 08:12:47 2006
New Revision: 384837
URL: http://svn.apache.org/viewcvs?rev=384837&view=rev
Log:
a little bit of spring cleaning to remove some of the cruft
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384837&r1=384836&r2=384837&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
Fri Mar 10 08:12:47 2006
@@ -16,15 +16,12 @@
*/
package org.apache.activemq.transport.udp;
-import org.activeio.ByteSequence;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
-import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,7 +54,6 @@
// reading
private Object readLock = new Object();
private ByteBuffer readBuffer;
- private SocketAddress lastReadDatagramAddress;
// writing
private Object writeLock = new Object();
@@ -80,10 +76,6 @@
}
public void start() throws Exception {
- // wireFormat.setPrefixPacketSize(false);
- wireFormat.setCacheEnabled(false);
- wireFormat.setTightEncodingEnabled(true);
-
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
@@ -96,22 +88,19 @@
public Command read() throws IOException {
Command answer = null;
- lastReadDatagramAddress = null;
synchronized (readLock) {
readBuffer.clear();
- lastReadDatagramAddress = channel.receive(readBuffer);
+ SocketAddress address = channel.receive(readBuffer);
readBuffer.flip();
- Endpoint from = headerMarshaller.createEndpoint(readBuffer,
lastReadDatagramAddress);
+ Endpoint from = headerMarshaller.createEndpoint(readBuffer,
address);
int remaining = readBuffer.remaining();
-
byte[] data = new byte[remaining];
readBuffer.get(data);
// TODO could use a DataInput implementation that talks direct to
- // the
- // ByteBuffer
+ // the ByteBuffer to avoid object allocation and unnecessary
buffering?
DataInputStream dataIn = new DataInputStream(new
ByteArrayInputStream(data));
answer = (Command) wireFormat.unmarshal(dataIn);
answer.setFrom(from);
@@ -124,15 +113,6 @@
return answer;
}
- /**
- * Called if a packet is received on a different channel from a remote
- * client
- *
- * @throws IOException
- */
- public void setWireFormatInfoEndpoint(DatagramEndpoint endpoint) throws
IOException {
- }
-
public void write(Command command) throws IOException {
write(command, targetAddress);
}
@@ -236,11 +216,6 @@
this.headerMarshaller = headerMarshaller;
}
- public SocketAddress getLastReadDatagramAddress() {
- synchronized (readLock) {
- return lastReadDatagramAddress;
- }
- }
// Implementation methods
//
-------------------------------------------------------------------------
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=384837&r1=384836&r2=384837&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
Fri Mar 10 08:12:47 2006
@@ -18,11 +18,12 @@
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
-import org.apache.activemq.transport.replay.ReplayStrategy;
import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy;
+import org.apache.activemq.transport.replay.ReplayStrategy;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,7 +61,6 @@
private int port;
private int minmumWireFormatVersion;
private String description = null;
- private DatagramEndpoint wireFormatHeader;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat;
@@ -105,11 +105,7 @@
checkStarted(command);
commandChannel.write(command, address);
}
-
- public void receivedHeader(DatagramEndpoint endpoint) {
- wireFormatHeader = endpoint;
- }
-
+
/**
* @return pretty print of 'this'
*/
@@ -132,10 +128,6 @@
Command command = commandChannel.read();
doConsume(command);
}
- /*
- * catch (SocketTimeoutException e) { } catch
- * (InterruptedIOException e) { }
- */
catch (AsynchronousCloseException e) {
try {
stop();
@@ -168,13 +160,16 @@
* the target to be the actual channel that the server has chosen for us to
* talk on.
*/
- public void useLastInboundDatagramAsNewTarget() {
- if (originalTargetAddress == null) {
- originalTargetAddress = targetAddress;
- }
- SocketAddress lastAddress =
commandChannel.getLastReadDatagramAddress();
- if (lastAddress != null) {
- targetAddress = lastAddress;
+ public void setTargetEndpoint(Endpoint newTarget) {
+ if (newTarget instanceof DatagramEndpoint) {
+ DatagramEndpoint endpoint = (DatagramEndpoint) newTarget;
+ SocketAddress address = endpoint.getAddress();
+ if (address != null) {
+ if (originalTargetAddress == null) {
+ originalTargetAddress = targetAddress;
+ }
+ targetAddress = address;
+ }
}
}
@@ -317,12 +312,6 @@
}
commandChannel = new CommandChannel(toString(), channel, wireFormat,
bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
commandChannel.start();
-
- // lets pass the header & address into the channel so it avoids a
- // re-request
- if (wireFormatHeader != null) {
- commandChannel.setWireFormatInfoEndpoint(wireFormatHeader);
- }
super.doStart();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=384837&r1=384836&r2=384837&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
Fri Mar 10 08:12:47 2006
@@ -120,9 +120,10 @@
protected Transport configureClientSideNegotiator(Transport transport,
WireFormat format, final UdpTransport udpTransport) {
transport = new WireFormatNegotiator(transport,
asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
protected void onWireFormatNegotiated(WireFormatInfo info) {
- // lets switch to the targetAddress that the last packet was
- // received as so that all future requests go to the newly
created UDP channel
- udpTransport.useLastInboundDatagramAsNewTarget();
+ // lets switch to the target endpoint
+ // based on the last packet that was received
+ // so that all future requests go to the newly created UDP
channel
+ udpTransport.setTargetEndpoint(info.getFrom());
}
};
return transport;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=384837&r1=384836&r2=384837&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
Fri Mar 10 08:12:47 2006
@@ -18,7 +18,6 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner;
@@ -144,9 +143,6 @@
final SocketAddress address = endpoint.getAddress();
final OpenWireFormat connectionWireFormat =
serverTransport.getWireFormat().copy();
final UdpTransport transport = new UdpTransport(connectionWireFormat,
address);
-
- // TODO - is this still required?
- transport.receivedHeader(endpoint);
Transport configuredTransport = new CommandJoiner(transport,
connectionWireFormat);