Author: rajdavies
Date: Thu Jan 15 15:48:02 2009
New Revision: 734856
URL: http://svn.apache.org/viewvc?rev=734856&view=rev
Log:
Use a Network instead of transport for broadcast
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
Thu Jan 15 15:48:02 2009
@@ -22,13 +22,14 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activeblaze.impl.destination.DestinationMatch;
-import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.network.Network;
+import org.apache.activeblaze.impl.network.NetworkFactory;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
import org.apache.activeblaze.impl.processor.CompressionProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.FragmentationProcessor;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.reliable.ReliableFactory;
-import org.apache.activeblaze.impl.transport.BaseTransport;
-import org.apache.activeblaze.impl.transport.TransportFactory;
import org.apache.activeblaze.util.IdGenerator;
import org.apache.activeblaze.util.PropertyUtil;
import org.apache.activeblaze.wire.BlazeData;
@@ -50,10 +51,10 @@
protected Buffer producerId;
protected AtomicLong sequence = new AtomicLong();
protected AtomicLong session = new AtomicLong(1);
- private Processor broadcast;
+ protected Processor broadcast;
+ protected Buffer managementURIBuffer;
protected BlazeConfiguration configuration = new BlazeConfiguration();
private String id;
- private Buffer managementURI;
private InetSocketAddress toAddress;
/**
@@ -85,11 +86,10 @@
/**
* @param destination
- * @param l
- * @return
+ *
+ * @return the TopicListener
* @throws Exception
- * @see
org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
- * org.apache.activeblaze.BlazeTopicListener)
+ * @see
org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String
destination)
*/
public BlazeTopicListener removeBlazeTopicMessageListener(String
destination) throws Exception {
Buffer key = new Buffer(destination);
@@ -102,26 +102,28 @@
String broadcastURIStr = getConfiguration().getBroadcastURI();
broadcastURIStr =
PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
URI broadcastURI = new URI(broadcastURIStr);
+ URI managementURI = null;
+ if (getConfiguration().getManagementURI() != null) {
+ managementURI = new URI(getConfiguration().getManagementURI());
+ this.managementURIBuffer = new
Buffer(managementURI.toString());
+ } else {
+ this.managementURIBuffer = new Buffer();
+ }
this.toAddress = new InetSocketAddress(broadcastURI.getHost(),
broadcastURI.getPort());
- this.managementURI = new Buffer(new
URI(getConfiguration().getManagementURI()).toString());
- BaseTransport transport = TransportFactory.get(broadcastURI);
- transport.setName(getId() + "-Broadcast");
+ Network transport = NetworkFactory.get(broadcastURI,
managementURI);
+ transport.setName(getId());
this.broadcast = configureProcess(transport);
this.broadcast.init();
}
return result;
}
- protected final void configureTransport(BaseTransport transport) throws
Exception {
- transport.setMaxPacketSize(getConfiguration().getMaxPacketSize());
- }
-
- protected Processor configureProcess(BaseTransport transport) throws
Exception {
+ protected final Processor configureProcess(ChainedProcessor transport)
throws Exception {
int maxPacketSize = getConfiguration().getMaxPacketSize();
- configureTransport(transport);
CompressionProcessor result = new CompressionProcessor();
result.setPrev(this);
result.setExceptionListener(this);
+ result.setMaxPacketSize(maxPacketSize);
FragmentationProcessor fp = new FragmentationProcessor();
fp.setMaxPacketSize(maxPacketSize);
result.setEnd(fp);
@@ -162,7 +164,7 @@
blazeData.setDestination(new Buffer(destination));
PacketData packetData = getPacketData(MessageType.BLAZE_DATA,
blazeData);
packetData.setReliable(true);
- packetData.setFromAddress(this.managementURI);
+ packetData.setFromAddress(this.managementURIBuffer);
Packet packet = new Packet(packetData);
packet.setTo(this.toAddress);
this.broadcast.downStream(packet);
@@ -170,7 +172,7 @@
protected synchronized PacketData getPacketData(MessageType type,
Message<?> message) {
PacketData packetData = new PacketData();
- packetData.setFromAddress(this.managementURI);
+ packetData.setFromAddress(this.managementURIBuffer);
packetData.setType(type.getNumber());
packetData.setProducerId(this.producerId);
packetData.setSessionId(this.session.get());
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
Thu Jan 15 15:48:02 2009
@@ -100,7 +100,7 @@
URI groupManagementURI = new URI(groupManagementURIStr);
this.toManagementAddress = new
InetSocketAddress(groupManagementURI.getHost(), groupManagementURI.getPort());
this.groupManagementTransport =
TransportFactory.get(groupManagementURI);
- configureTransport(this.groupManagementTransport);
+
this.groupManagementTransport.setMaxPacketSize(getConfiguration().getMaxPacketSize());
this.groupManagementTransport.setPrev(this);
this.groupManagementTransport.setName(getId() +
"-HeartbeatTransport");
this.groupManagementTransport.init();
@@ -615,31 +615,9 @@
* @param message
* @throws Exception
*/
- public void broadcastMessage(MessageType messageType, Message<?> message)
throws Exception {
- PacketData data = getPacketData(messageType, message);
- data.setReliable(false);
- data.setFromAddress(this.inboxURI);
- Packet packet = new Packet(data);
- packet.setTo(this.toManagementAddress);
- this.groupManagementTransport.downStream(packet);
- }
- /**
- * broadcast a general message
- *
- * @param asyncRequest
- * @param messageType
- * @param message
- * @throws Exception
- */
- public void broadcastMessage(AsyncGroupRequest asyncRequest, MessageType
messageType, Message<?> message)
- throws Exception {
- SendRequest request = new SendRequest();
+ public void broadcastMessage(MessageType messageType, Message<?> message)
throws Exception {
PacketData data = getPacketData(messageType, message);
- asyncRequest.add(data.getMessageId(), request);
- synchronized (this.messageRequests) {
- this.messageRequests.put(data.getMessageId(), request);
- }
data.setReliable(false);
data.setFromAddress(this.inboxURI);
Packet packet = new Packet(data);
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
Thu Jan 15 15:48:02 2009
@@ -18,6 +18,7 @@
import java.net.URI;
import org.apache.activeblaze.ExceptionListener;
+import org.apache.activeblaze.Processor;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.transport.BaseTransport;
@@ -80,11 +81,15 @@
this.broadcast = TransportFactory.get(this.uri);
this.broadcast.setName(getName() + "-Broadcast");
this.broadcast.setExceptionListener(this);
+ this.broadcast.setPrev(getPrev());
+ this.broadcast.setNext(getNext());
this.broadcast.init();
if(this.managementURI != null &&
!this.managementURI.equals(this.uri)){
this.management = TransportFactory.get(this.managementURI);
this.management.setName(getName() + "-Management");
this.management.setExceptionListener(this);
+ this.management.setPrev(getPrev());
+ this.management.setNext(getNext());
this.management.init();
}
@@ -184,7 +189,35 @@
}
}
+
+ public void setNext(Processor next) {
+ super.setNext(next);
+ if (this.management != null) {
+ this.management.setNext(next);
+ }
+ if(this.broadcast!=null){
+ this.broadcast.setNext(next);
+ }
+ }
+
+ public void setPrev(Processor prev) {
+ super.setPrev(prev);
+ if (this.management != null) {
+ this.management.setPrev(prev);;
+ }
+ if(this.broadcast!=null){
+ this.broadcast.setPrev(prev);
+ }
+ }
+ public void setMaxPacketSize(int maxPacketSize) {
+ if (this.management != null) {
+ this.management.setMaxPacketSize(maxPacketSize);
+ }
+ if(this.broadcast!=null){
+ this.broadcast.setMaxPacketSize(maxPacketSize);
+ }
+ }
}
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
Thu Jan 15 15:48:02 2009
@@ -16,8 +16,7 @@
*/
package org.apache.activeblaze.impl.network;
import java.net.URI;
-import org.apache.activeblaze.Service;
-import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
import org.apache.activeblaze.impl.processor.Packet;
/**
@@ -26,7 +25,7 @@
* channel instances
*
*/
-public interface Network extends Processor, Service {
+public interface Network extends ChainedProcessor{
/**
* @return the name
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
Thu Jan 15 15:48:02 2009
@@ -25,17 +25,24 @@
public class NetworkFactory {
/**
- * @param location
+ * @param broadcast
+ * @param management
* @return the network associated with the URI
* @throws Exception
*/
- public static Network get(URI location) throws Exception {
+ public static Network get(URI broadcast, URI management) throws Exception {
Network result = null;
- String scheme = location.getScheme();
+ String scheme = broadcast.getScheme();
scheme = scheme.trim();
if (scheme.equalsIgnoreCase("mcast") ||
scheme.equalsIgnoreCase("multicast")){
result = new MulticastNetwork();
}
+ if(result != null) {
+ result.setURI(broadcast);
+ if(management!=null) {
+ result.setManagementURI(management);
+ }
+ }
return result;
}
}
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
Thu Jan 15 15:48:02 2009
@@ -17,6 +17,7 @@
package org.apache.activeblaze.impl.processor;
import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.BlazeConfiguration;
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.ExceptionListener;
import org.apache.activeblaze.Processor;
@@ -24,89 +25,88 @@
import org.apache.commons.logging.LogFactory;
/**
- * Chains Processors together
+ * Default implementation of a ChainedProcessor
*
*/
-public class DefaultChainedProcessor extends BaseService implements Processor {
+public class DefaultChainedProcessor extends BaseService implements
ChainedProcessor {
private static final Log LOG =
LogFactory.getLog(DefaultChainedProcessor.class);
private Processor next;
private Processor prev;
protected ExceptionListener exceptionListener;
+ private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
protected DefaultChainedProcessor() {
}
/**
- * @return the next
+ * @return the next Processor
+ * @see org.apache.activeblaze.impl.processor.ChainedProcessor#getNext()
*/
public Processor getNext() {
return this.next;
}
/**
- * Set Next at the end of the chain
- *
* @param next
- *
+ * @see
org.apache.activeblaze.impl.processor.ChainedProcessor#setEnd(org.apache.activeblaze.Processor)
*/
public void setEnd(Processor next) {
- DefaultChainedProcessor target = this;
+ ChainedProcessor target = this;
Processor n = getNext();
while (n != null) {
- if (n instanceof DefaultChainedProcessor) {
- DefaultChainedProcessor cn = (DefaultChainedProcessor) n;
+ if (n instanceof ChainedProcessor) {
+ ChainedProcessor cn = (ChainedProcessor) n;
target = cn;
n = cn.getNext();
}
}
- if (next instanceof DefaultChainedProcessor) {
- target.setNextChain((DefaultChainedProcessor) next);
+ if (next instanceof ChainedProcessor) {
+ target.setNextChain((ChainedProcessor) next);
} else {
- target.next = next;
+ target.setNext(next);
}
}
/**
- * Set the next
- *
* @param next
+ * @see
org.apache.activeblaze.impl.processor.ChainedProcessor#setNext(org.apache.activeblaze.Processor)
*/
public void setNext(Processor next) {
this.next = next;
}
/**
- * @return the prev
+ * @return previous processor
+ * @see org.apache.activeblaze.impl.processor.ChainedProcessor#getPrev()
*/
public Processor getPrev() {
return this.prev;
}
/**
- * Set the next chain
- *
* @param p
+ * @see
org.apache.activeblaze.impl.processor.ChainedProcessor#setNextChain(org.apache.activeblaze.impl.processor.ChainedProcessor)
*/
- public void setNextChain(DefaultChainedProcessor p) {
- DefaultChainedProcessor target = this;
+ public void setNextChain(ChainedProcessor p) {
+ ChainedProcessor target = this;
Processor n = getNext();
while (n != null) {
- if (n instanceof DefaultChainedProcessor) {
- DefaultChainedProcessor cn = (DefaultChainedProcessor) n;
+ if (n instanceof ChainedProcessor) {
+ ChainedProcessor cn = (ChainedProcessor) n;
target = cn;
n = cn.getNext();
}
}
- target.next = p;
+ target.setNext(p);
p.setPrev(target);
- if (this.exceptionListener != null && p.exceptionListener == null) {
- p.exceptionListener = this.exceptionListener;
+ if (this.exceptionListener != null && p.getExceptionListener() ==
null) {
+ p.setExceptionListener(this.exceptionListener);
}
}
/**
* @param prev
- * the prev to set
+ * @see
org.apache.activeblaze.impl.processor.ChainedProcessor#setPrev(org.apache.activeblaze.Processor)
*/
public void setPrev(Processor prev) {
this.prev = prev;
@@ -191,4 +191,26 @@
LOG.error("Caught an exception stopping", e);
}
}
+
+ /**
+ * @return the exceptionListener
+ * @see
org.apache.activeblaze.impl.processor.ChainedProcessor#getExceptionListener()
+ */
+ public ExceptionListener getExceptionListener() {
+ return this.exceptionListener;
+ }
+
+ /**
+ * @return the maxPacketSize
+ */
+ public int getMaxPacketSize() {
+ return this.maxPacketSize;
+ }
+
+ /**
+ * @param maxPacketSize the maxPacketSize to set
+ */
+ public void setMaxPacketSize(int maxPacketSize) {
+ this.maxPacketSize = maxPacketSize;
+ }
}
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
Thu Jan 15 15:48:02 2009
@@ -20,7 +20,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import org.apache.activeblaze.BlazeConfiguration;
import org.apache.activemq.protobuf.Buffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,7 +31,6 @@
@SuppressWarnings("serial")
public class FragmentationProcessor extends DefaultChainedProcessor {
private static final Log LOG =
LogFactory.getLog(FragmentationProcessor.class);
- private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
private int maxCacheSize = 16 * 1024;
private Map<String,List<Packet>> cache = new
LinkedHashMap<String,List<Packet>>() {
protected boolean removeEldestEntry(Map.Entry<String,List<Packet>>
eldest) {
@@ -110,19 +108,7 @@
}
}
- /**
- * @return the maxPacketSize
- */
- public int getMaxPacketSize() {
- return this.maxPacketSize;
- }
-
- /**
- * @param maxPacketSize the maxPacketSize to set
- */
- public void setMaxPacketSize(int maxPacketSize) {
- this.maxPacketSize = maxPacketSize;
- }
+
/**
* @return the maxCacheSize
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
Thu Jan 15 15:48:02 2009
@@ -17,6 +17,7 @@
package org.apache.activeblaze.impl.reliable;
import java.util.Map;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.util.ObjectFinder;
import org.apache.activeblaze.util.PropertyUtil;
@@ -40,7 +41,7 @@
return result;
}
- static void configure(DefaultChainedProcessor transport, String location)
throws Exception {
+ static void configure(ChainedProcessor transport, String location) throws
Exception {
Map<String, String> options = PropertyUtil.parseParameters(location);
PropertyUtil.setProperties(transport, options);
}
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
Thu Jan 15 15:48:02 2009
@@ -31,11 +31,9 @@
*/
public abstract class BaseTransport extends ThreadChainedProcessor {
private static final Log LOG = LogFactory.getLog(BaseTransport.class);
- static final int DEFAULT_MAX_PACKET_SIZE =
BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private URI localURI;
private Buffer bufferOfLocalURI;
- private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private int soTimeout = 2000;
private int timeToLive = 1;
@@ -117,22 +115,6 @@
this.bufferOfLocalURI = new Buffer(this.localURI.toString());
}
}
-
- /**
- * @return the maxPacketSize
- */
- public int getMaxPacketSize() {
- return this.maxPacketSize;
- }
-
- /**
- * @param maxPacketSize
- * the maxPacketSize to set
- */
- public void setMaxPacketSize(int maxPacketSize) {
- this.maxPacketSize = maxPacketSize;
- }
-
/**
* @return the bufferSize
*/
Modified:
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
(original)
+++
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
Thu Jan 15 15:48:02 2009
@@ -53,7 +53,7 @@
latch.await(10, TimeUnit.SECONDS);
receiver.stop();
sender.stop();
- assertEquals("Not enough messages", 0, latch.getCount());
+ assertEquals("Too many messages not sent ", 0, latch.getCount());
}
public void testGroupBroadcast() throws Exception {
Modified:
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
(original)
+++
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
Thu Jan 15 15:48:02 2009
@@ -79,7 +79,7 @@
public void testUpStream() throws Exception {
final AtomicBoolean test = new AtomicBoolean();
- DefaultChainedProcessor target = new DefaultChainedProcessor() {
+ ChainedProcessor target = new DefaultChainedProcessor() {
public void upStream(Packet p) {
test.set(true);
}