Author: rajdavies
Date: Tue Jan 27 13:51:37 2009
New Revision: 738092
URL: http://svn.apache.org/viewvc?rev=738092&view=rev
Log:
Moved reliability choice down to the network layer
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
Tue Jan 27 13:51:37 2009
@@ -100,13 +100,13 @@
if (managementURIStr != null && managementURIStr.length() > 0) {
managementURI = new URI(getConfiguration().getManagementURI());
}
- Network transport = NetworkFactory.get(broadcastURI, managementURI);
+ Network transport = NetworkFactory.get(broadcastURI, managementURI,
getConfiguration().getReliableBroadcast());
transport.setName(getId());
- this.broadcast = configureProcess(transport,
getConfiguration().getReliableBroadcast());
+ this.broadcast = configureProcess(transport);
this.broadcast.init();
}
- protected final Processor configureProcess(ChainedProcessor transport,
String reliability) throws Exception {
+ protected final Processor configureProcess(ChainedProcessor transport)
throws Exception {
int maxPacketSize = getConfiguration().getMaxPacketSize();
CompressionProcessor result = new CompressionProcessor();
result.setPrev(this);
@@ -115,8 +115,6 @@
FragmentationProcessor fp = new FragmentationProcessor();
fp.setMaxPacketSize(maxPacketSize);
result.setEnd(fp);
- ChainedProcessor reliable = getReliability(reliability);
- result.setEnd(reliable);
result.setEnd(transport);
return result;
}
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
Tue Jan 27 13:51:37 2009
@@ -30,7 +30,9 @@
import org.apache.activeblaze.Processor;
import org.apache.activeblaze.impl.destination.DestinationMatch;
import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.CompressionProcessor;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.processor.FragmentationProcessor;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.reliable.ReliableFactory;
import org.apache.activeblaze.impl.transport.BaseTransport;
@@ -99,6 +101,21 @@
this.local = createLocal(unicastURI);
this.group = createGroup();
}
+
+ protected final Processor configureProcess(ChainedProcessor transport,
String reliability) throws Exception {
+ int maxPacketSize = getConfiguration().getMaxPacketSize();
+ CompressionProcessor result = new CompressionProcessor();
+ result.setPrev(this);
+ result.setExceptionListener(this);
+ result.setMaxPacketSize(maxPacketSize);
+ FragmentationProcessor fp = new FragmentationProcessor();
+ fp.setMaxPacketSize(maxPacketSize);
+ result.setEnd(fp);
+ ChainedProcessor reliable = getReliability(reliability);
+ result.setEnd(reliable);
+ result.setEnd(transport);
+ return result;
+ }
protected ChainedProcessor getReliability(String reliability) throws
Exception {
DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
Tue Jan 27 13:51:37 2009
@@ -20,8 +20,10 @@
import java.net.URI;
import org.apache.activeblaze.ExceptionListener;
import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableFactory;
import org.apache.activeblaze.impl.transport.BaseTransport;
import org.apache.activeblaze.impl.transport.TransportFactory;
@@ -32,11 +34,12 @@
public class MulticastNetwork extends DefaultChainedProcessor implements
Network, ExceptionListener {
private URI uri;
private URI managementURI;
- private BaseTransport broadcast;
- private BaseTransport management;
+ private ChainedProcessor broadcast;
+ private ChainedProcessor management;
private String name = "";
private InetSocketAddress broadcastAddress;
private InetSocketAddress managementAddress;
+ private String reliability = "simple";
/**
* @return the name
@@ -70,25 +73,46 @@
}
/**
- * @return true if initialized
+ * @return the reliable protocol used
+ * @see org.apache.activeblaze.impl.network.Network#getReliability()
+ */
+ public String getReliability() {
+ return this.reliability;
+ }
+
+ /**
+ * @param reliability
+ * @see
org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
+ */
+ public void setReliability(String reliability) {
+ this.reliability = reliability;
+ }
+
+ /**
+ * initialize the network
+ *
* @throws Exception
* @see org.apache.activeblaze.Service#init()
*/
public void doInit() throws Exception {
super.doInit();
- this.broadcast = TransportFactory.get(this.uri);
- this.broadcast.setName(getName() + "-Broadcast");
- this.broadcast.setExceptionListener(this);
+ this.broadcast = ReliableFactory.get(getReliability());
+ BaseTransport transport = TransportFactory.get(this.uri);
+ transport.setName(getName() + "-Broadcast");
+ transport.setExceptionListener(this);
this.broadcast.setPrev(getPrev());
- this.broadcast.setNext(getNext());
+ this.broadcast.setNext(transport);
+ transport.setPrev(this.broadcast);
this.broadcastAddress = new InetSocketAddress(this.uri.getHost(),
this.uri.getPort());
this.broadcast.init();
if (this.managementURI != null &&
!this.managementURI.equals(this.uri)) {
- this.management = TransportFactory.get(this.managementURI);
- this.management.setName(getName() + "-Management");
- this.management.setExceptionListener(this);
+ this.management = ReliableFactory.get(getReliability());
+ BaseTransport managementTransport =
TransportFactory.get(this.managementURI);
+ managementTransport.setName(getName() + "-Management");
+ managementTransport.setExceptionListener(this);
this.management.setPrev(getPrev());
- this.management.setNext(getNext());
+ this.management.setNext(managementTransport);
+ managementTransport.setPrev(this.management);
this.managementAddress = new
InetSocketAddress(this.managementURI.getHost(), this.managementURI.getPort());
this.management.init();
}
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
Tue Jan 27 13:51:37 2009
@@ -17,7 +17,6 @@
package org.apache.activeblaze.impl.network;
import java.net.URI;
import org.apache.activeblaze.impl.processor.ChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
/**
* <P>
@@ -50,6 +49,17 @@
*/
public void setManagementURI(URI uri) throws Exception;
+ /**
+ * Set the reliable protocol to use for this network
+ * @param reliability
+ */
+ public void setReliability(String reliability);
+
+ /**
+ * @return the reliability protocol used for this network
+ */
+ public String getReliability();
+
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
Tue Jan 27 13:51:37 2009
@@ -35,7 +35,7 @@
* @return the network associated with the URI
* @throws Exception
*/
- public static Network get(URI broadcast, URI management) throws Exception {
+ public static Network get(URI broadcast, URI management,String
reliability) throws Exception {
Network result = findNetwork(broadcast);
result.setURI(broadcast);
result.setManagementURI(management);
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
(original)
+++
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
Tue Jan 27 13:51:37 2009
@@ -24,8 +24,10 @@
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.ExceptionListener;
import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableFactory;
import org.apache.activeblaze.impl.transport.BaseTransport;
import org.apache.activeblaze.impl.transport.TransportFactory;
import org.apache.activeblaze.util.URISupport;
@@ -43,10 +45,11 @@
private List<URI> managementURIs = new ArrayList<URI>();
private List<InetSocketAddress> broadcastAddresses = new
ArrayList<InetSocketAddress>();
private List<InetSocketAddress> managementAddresses = new
ArrayList<InetSocketAddress>();
- private BaseTransport broadcast;
- private BaseTransport management;
+ private ChainedProcessor broadcast;
+ private ChainedProcessor management;
private String name = "";
private boolean useFirstFreeAddress;
+ private String reliability = "swp";
/**
* @return the name
@@ -64,6 +67,22 @@
}
/**
+ * @return the reliability protocol used
+ * @see org.apache.activeblaze.impl.network.Network#getReliability()
+ */
+ public String getReliability() {
+ return this.reliability;
+ }
+
+ /**
+ * @param reliability
+ * @see
org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
+ */
+ public void setReliability(String reliability) {
+ this.reliability = reliability;
+ }
+
+ /**
* @param location
* @throws Exception
* @see
org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
@@ -110,19 +129,25 @@
*/
public void doInit() throws Exception {
super.doInit();
- this.broadcast = createTransport(this.broadcastURIs);
- this.broadcast.setName(getName() + "-Broadcast");
- this.broadcast.setExceptionListener(this);
+ this.broadcast = ReliableFactory.get(getReliability());
+ BaseTransport transport = createTransport(this.broadcastURIs);
+ transport.setName(getName() + "-Broadcast");
+ transport.setExceptionListener(this);
this.broadcast.setPrev(getPrev());
- this.broadcast.setNext(getNext());
+ this.broadcast.setNext(transport);
+ transport.setPrev(this.broadcast);
+ this.broadcast.init();
if (!this.managementURIs.isEmpty() &&
!this.managementURIs.equals(this.broadcastURIs)) {
- this.management = createTransport(this.managementURIs);
- this.management.setName(getName() + "-Management");
- this.management.setExceptionListener(this);
+ this.management = ReliableFactory.get(getReliability());
+ BaseTransport managementTransport
=createTransport(this.managementURIs);
+ managementTransport.setName(getName() + "-Management");
+ managementTransport.setExceptionListener(this);
this.management.setPrev(getPrev());
- this.management.setNext(getNext());
+ this.management.setNext(managementTransport);
+ managementTransport.setPrev(this.management);
this.management.init();
}
+
}
/**
Modified:
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
(original)
+++
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
Tue Jan 27 13:51:37 2009
@@ -31,5 +31,6 @@
uri += ")";
fac.getConfiguration().setManagementURI("");
fac.getConfiguration().setBroadcastURI(uri);
+ fac.getConfiguration().setReliableBroadcast("swp");
}
}
Modified:
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
---
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
(original)
+++
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
Tue Jan 27 13:51:37 2009
@@ -30,5 +30,6 @@
uri += ")";
fac.getConfiguration().setManagementURI("");
fac.getConfiguration().setBroadcastURI(uri);
+ fac.getConfiguration().setReliableBroadcast("swp");
}
}