Author: azeez
Date: Fri Nov 18 18:00:55 2011
New Revision: 1203771
URL: http://svn.apache.org/viewvc?rev=1203771&view=rev
Log:
Introduced messging functionality at the Cluster level. Messages can be sent in
RPC or non-RPC mode. This functionality is needed since there are usecases
where one member needs to send custom messages to other members in the cluster.
Modified:
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
Modified:
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
URL:
http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1203771&r1=1203770&r2=1203771&view=diff
==============================================================================
---
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
(original)
+++
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
Fri Nov 18 18:00:55 2011
@@ -22,12 +22,7 @@ package org.apache.axis2.clustering.trib
import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.clustering.ClusteringAgent;
-import org.apache.axis2.clustering.ClusteringConstants;
-import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.MembershipListener;
-import org.apache.axis2.clustering.MembershipScheme;
-import org.apache.axis2.clustering.RequestBlockingHandler;
+import org.apache.axis2.clustering.*;
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
@@ -48,8 +43,10 @@ import org.apache.axis2.engine.DispatchP
import org.apache.axis2.engine.Phase;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ErrorHandler;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcChannel;
@@ -83,7 +80,14 @@ public class TribesClusteringAgent imple
private final HashMap<String, Parameter> parameters;
private ManagedChannel channel;
+ /**
+ * RpcChannel used for cluster initialization interactions
+ */
private RpcChannel rpcInitChannel;
+ /**
+ * RpcChannel used for RPC messaging interactions
+ */
+ private RpcChannel rpcMessagingChannel;
private ConfigurationContext configurationContext;
private Axis2ChannelListener axis2ChannelListener;
private ChannelSender channelSender;
@@ -104,6 +108,7 @@ public class TribesClusteringAgent imple
private final Map<String, GroupManagementAgent> groupManagementAgents =
new HashMap<String, GroupManagementAgent>();
private boolean clusterManagementMode;
+ private RpcMessagingHandler rpcMessagingHandler;
public TribesClusteringAgent() {
parameters = new HashMap<String, Parameter>();
@@ -166,10 +171,19 @@ public class TribesClusteringAgent imple
// picks it up. Each RPC is given a UUID, hence can correlate the
request-response pair
rpcInitRequestHandler = new
RpcInitializationRequestHandler(configurationContext);
rpcInitChannel =
- new RpcChannel(TribesUtil.getRpcInitChannelId(domain),
- channel, rpcInitRequestHandler);
+ new RpcChannel(TribesUtil.getRpcInitChannelId(domain), channel,
+ rpcInitRequestHandler);
if (log.isDebugEnabled()) {
- log.debug("Created RPC Channel for domain " + new String(domain));
+ log.debug("Created RPC Init Channel for domain " + new
String(domain));
+ }
+
+ // Initialize RpcChannel used for messaging
+ rpcMessagingHandler = new RpcMessagingHandler(configurationContext);
+ rpcMessagingChannel =
+ new RpcChannel(TribesUtil.getRpcMessagingChannelId(domain),
channel,
+ rpcMessagingHandler);
+ if (log.isDebugEnabled()) {
+ log.debug("Created RPC Messaging Channel for domain " + new
String(domain));
}
setMaximumRetries();
@@ -231,6 +245,48 @@ public class TribesClusteringAgent imple
}
}
+ public List<ClusteringCommand> sendMessage(ClusteringMessage message,
+ boolean isRpcMessage) throws
ClusteringFault {
+ List<ClusteringCommand> responseList = new
ArrayList<ClusteringCommand>();
+ Member[] members = primaryMembershipManager.getMembers();
+ if (members.length == 0) {
+ return responseList;
+ }
+ if (isRpcMessage) {
+ try {
+ Response[] responses = rpcMessagingChannel.send(members,
message, RpcChannel.ALL_REPLY,
+
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK,
+ 10000);
+ for (Response response : responses) {
+ responseList.add((ClusteringCommand)response.getMessage());
+ }
+ } catch (ChannelException e) {
+ String msg = "Error occurred while sending RPC message to
cluster.";
+ log.error(msg, e);
+ throw new ClusteringFault(msg, e);
+ }
+ } else {
+ try {
+ channel.send(members, message, 10000, new ErrorHandler(){
+ public void handleError(ChannelException e, UniqueId
uniqueId) {
+ log.error("Sending failed " + uniqueId, e );
+ }
+
+ public void handleCompletion(UniqueId uniqueId) {
+ if(log.isDebugEnabled()){
+ log.debug("Sending successful " + uniqueId);
+ }
+ }
+ });
+ } catch (ChannelException e) {
+ String msg = "Error occurred while sending message to
cluster.";
+ log.error(msg, e);
+ throw new ClusteringFault(msg, e);
+ }
+ }
+ return responseList;
+ }
+
private void setMemberInfo() throws ClusteringFault {
Properties memberInfo = new Properties();
AxisConfiguration axisConfig =
configurationContext.getAxisConfiguration();
@@ -602,7 +658,7 @@ public class TribesClusteringAgent imple
* Get some information from a neighbour. This information will be used by
this node to
* initialize itself
* <p/>
- * rpcChannel is The utility for sending RPC style messages to the channel
+ * rpcInitChannel is The utility for sending RPC style messages to the
channel
*
* @param command The control command to send
* @throws ClusteringFault If initialization code failed on this node
@@ -717,6 +773,7 @@ public class TribesClusteringAgent imple
if (channel != null) {
try {
channel.removeChannelListener(rpcInitChannel);
+ channel.removeChannelListener(rpcMessagingChannel);
channel.removeChannelListener(axis2ChannelListener);
channel.stop(Channel.DEFAULT);
} catch (ChannelException e) {
@@ -736,6 +793,9 @@ public class TribesClusteringAgent imple
if (rpcInitRequestHandler != null) {
rpcInitRequestHandler.setConfigurationContext(configurationContext);
}
+ if (rpcMessagingHandler!= null) {
+ rpcMessagingHandler.setConfigurationContext(configurationContext);
+ }
if (axis2ChannelListener != null) {
axis2ChannelListener.setConfigurationContext(configurationContext);
}
Modified:
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
URL:
http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=1203771&r1=1203770&r2=1203771&view=diff
==============================================================================
---
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
(original)
+++
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
Fri Nov 18 18:00:55 2011
@@ -26,6 +26,11 @@ public final class TribesConstants {
public static final String RPC_INIT_CHANNEL = "rpc.init.channel";
/**
+ * The ID of the RPC messaging channel
+ */
+ public static final String RPC_MESSAGING_CHANNEL = "rpc.msg.channel";
+
+ /**
* The ID of the RPC membership message channel. This channel is only used
when WKA
* membership discovery mechanism is used
*/
Modified:
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL:
http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=1203771&r1=1203770&r2=1203771&view=diff
==============================================================================
---
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
(original)
+++
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
Fri Nov 18 18:00:55 2011
@@ -101,6 +101,10 @@ public class TribesUtil {
return (new String(domain) + ":" +
TribesConstants.RPC_INIT_CHANNEL).getBytes();
}
+ public static byte[] getRpcMessagingChannelId(byte[] domain) {
+ return (new String(domain) + ":" +
TribesConstants.RPC_MESSAGING_CHANNEL).getBytes();
+ }
+
public static boolean isInDomain(Member member, byte[] domain) {
return Arrays.equals(domain, member.getDomain());
}
Modified:
axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
URL:
http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java?rev=1203771&r1=1203770&r2=1203771&view=diff
==============================================================================
---
axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
(original)
+++
axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
Fri Nov 18 18:00:55 2011
@@ -184,4 +184,15 @@ public interface ClusteringAgent extends
* @return the domains of this ClusteringAgent
*/
Set<String> getDomains();
+
+
+ /**
+ * Send a message to all members in this member's primary cluster
+ *
+ * @param msg The message to be sent
+ * @param isRpcMessage Indicates whether the message has to be sent in RPC
mode
+ * @return A list of responses if the message is sent in RPC mode
+ * @throws ClusteringFault If an error occurs while sending the message
+ */
+ List<ClusteringCommand> sendMessage(ClusteringMessage msg, boolean
isRpcMessage) throws ClusteringFault;
}
\ No newline at end of file