Supporting cross cluster message service.

This feature is for supporting a complex application scenario. This application 
is deployed as 2 Helix clusters. The brains that manage business logic are 
controlled in a CONTROLLER Helix cluster, while the workers are managered in a 
separate WORKER Helix cluster. This brain nodes of the application rely on 
Helix messaging service to send commands. Due to the communication requirement 
between these 2 Helix clusters, Helix needs to support cross cluster messaging.

In this change, we introduced target_cluster_name field into message criteria. 
Once specified, the messaging node will send message to the specified cluster 
scope. The other features, like reply, work in the same way as before. The 
sender node in Cluster A will get message reply from Cluster B nodes.

Note that we are not supporting cross ZK messaging. Also, relay message won't 
cross cluster as well.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b91d6eee
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b91d6eee
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b91d6eee

Branch: refs/heads/master
Commit: b91d6eee47598842707c109a8bc47bfb1390ec78
Parents: 07170dc
Author: Jiajun Wang <[email protected]>
Authored: Fri Aug 3 17:55:04 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Fri Sep 21 14:17:45 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/Criteria.java    |  24 +-
 .../DefaultSchedulerMessageHandlerFactory.java  |  17 +-
 .../helix/messaging/CriteriaEvaluator.java      |  23 +-
 .../messaging/DefaultMessagingService.java      | 133 ++++----
 .../helix/messaging/handling/HelixTask.java     |  50 ++-
 .../java/org/apache/helix/model/Message.java    |  19 +-
 .../TestCrossClusterMessagingService.java       | 333 +++++++++++++++++++
 7 files changed, 512 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b91d6eee/helix-core/src/main/java/org/apache/helix/Criteria.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/Criteria.java 
b/helix-core/src/main/java/org/apache/helix/Criteria.java
index 5750326..d3b958e 100644
--- a/helix-core/src/main/java/org/apache/helix/Criteria.java
+++ b/helix-core/src/main/java/org/apache/helix/Criteria.java
@@ -66,6 +66,10 @@ public class Criteria {
    * Determine if use external view or ideal state as source of truth
    */
   DataSource _dataSource = DataSource.EXTERNALVIEW;
+  /**
+   * The name of target cluster. If null, means sending to the local cluster
+   */
+  String _clusterName = null;
 
   /**
    * Get the current source of truth
@@ -196,6 +200,22 @@ public class Criteria {
     this.partitionState = partitionState;
   }
 
+  /**
+   * Get the target cluster name
+   * @return the target cluster name if set or null if not set
+   */
+  public String getClusterName() {
+    return _clusterName;
+  }
+
+  /**
+   * Set the target cluster name
+   * @param clusterName target cluster name to send message
+   */
+  public void setClusterName(String clusterName) {
+    _clusterName = clusterName;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -203,7 +223,9 @@ public class Criteria {
     sb.append("resourceName").append("=").append(resourceName);
     sb.append("partitionName").append("=").append(partitionName);
     sb.append("partitionState").append("=").append(partitionState);
+    if (_clusterName != null) {
+      sb.append("clusterName").append("=").append(_clusterName);
+    }
     return sb.toString();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b91d6eee/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 0d3d86e..93819f9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -28,29 +28,27 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.util.StatusUpdateUtil;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.common.collect.ImmutableList;
 
 /*
  * The current implementation supports throttling on STATE-TRANSITION type of 
message, transition SCHEDULED-COMPLETED.
@@ -169,6 +167,13 @@ public class DefaultSchedulerMessageHandlerFactory 
implements MultiTypeMessageHa
       HelixDataAccessor accessor = _manager.getHelixDataAccessor();
       Builder keyBuilder = accessor.keyBuilder();
 
+      String clusterName = recipientCriteria.getClusterName();
+      if (clusterName != null && 
!clusterName.equals(_manager.getClusterName())) {
+        throw new HelixException(String.format(
+            "ScheduledTaskQueue cannot send message to another cluster. Local 
cluster name %s, remote cluster name %s.",
+            _manager.getClusterName(), clusterName));
+      }
+
       Map<String, String> sendSummary = new HashMap<String, String>();
       sendSummary.put("MessageCount", "0");
       Map<InstanceType, List<Message>> messages =

http://git-wip-us.apache.org/repos/asf/helix/blob/b91d6eee/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java 
b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
index ec7b0fd..a2c3139 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
@@ -26,6 +26,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.helix.Criteria;
 import org.apache.helix.Criteria.DataSource;
 import org.apache.helix.HelixDataAccessor;
@@ -36,10 +39,6 @@ import org.apache.helix.PropertyKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 public class CriteriaEvaluator {
   private static Logger logger = 
LoggerFactory.getLogger(CriteriaEvaluator.class);
   public static final String MATCH_ALL_SYM = "%";
@@ -50,9 +49,21 @@ public class CriteriaEvaluator {
    * @param manager connection to the persisted data
    * @return map of evaluated criteria
    */
-  public List<Map<String, String>> evaluateCriteria(Criteria 
recipientCriteria, HelixManager manager) {
+  public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria,
+      HelixManager manager) {
+    return evaluateCriteria(recipientCriteria, manager.getHelixDataAccessor());
+  }
+
+  /**
+   * Examine persisted data to match wildcards in {@link Criteria}
+   *
+   * @param recipientCriteria Criteria specifying the message destinations
+   * @param accessor          connection to the persisted data
+   * @return map of evaluated criteria
+   */
+  public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria,
+      HelixDataAccessor accessor) {
     // get the data
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     List<HelixProperty> properties;

http://git-wip-us.apache.org/repos/asf/helix/blob/b91d6eee/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index e725a8e..09efadc 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.messaging.handling.AsyncCallbackService;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
@@ -112,6 +113,7 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
       _asyncCallbackService.registerAsyncCallback(correlationId, 
callbackOnReply);
     }
 
+    HelixDataAccessor targetDataAccessor = 
getRecipientDataAccessor(recipientCriteria);
     for (InstanceType receiverType : generateMessage.keySet()) {
       List<Message> list = generateMessage.get(receiverType);
       for (Message tempMessage : list) {
@@ -121,29 +123,39 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
         if (correlationId != null) {
           tempMessage.setCorrelationId(correlationId);
         }
+        tempMessage.setSrcClusterName(_manager.getClusterName());
 
-        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-        Builder keyBuilder = accessor.keyBuilder();
-
+        Builder keyBuilder = targetDataAccessor.keyBuilder();
         if (receiverType == InstanceType.CONTROLLER) {
-          // 
_manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
-          // tempMessage,
-          // tempMessage.getId());
-          
accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()), 
tempMessage);
-        }
-
-        if (receiverType == InstanceType.PARTICIPANT) {
-          accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(), 
tempMessage.getId()),
-              tempMessage);
+          targetDataAccessor
+              .setProperty(keyBuilder.controllerMessage(tempMessage.getId()), 
tempMessage);
+        } else if (receiverType == InstanceType.PARTICIPANT) {
+          targetDataAccessor
+              .setProperty(keyBuilder.message(tempMessage.getTgtName(), 
tempMessage.getId()),
+                  tempMessage);
         }
       }
     }
 
-    if (callbackOnReply != null) {
-      // start timer if timeout is set
-      callbackOnReply.startTimer();
+      if (callbackOnReply != null) {
+        // start timer if timeout is set
+        callbackOnReply.startTimer();
+      }
+      return totalMessageCount;
+  }
+
+  private HelixDataAccessor getRecipientDataAccessor(final Criteria 
recipientCriteria) {
+    HelixDataAccessor dataAccessor = _manager.getHelixDataAccessor();
+    String clusterName = recipientCriteria.getClusterName();
+    if (clusterName != null && !clusterName.equals(_manager.getClusterName())) 
{
+      // for cross cluster message, create new DataAccessor for sending 
message.
+      /*
+        TODO On frequent cross clsuter messaging request, keeping construct 
data accessor may cause
+        performance issue. We should consider adding cache in this service or 
HelixManager. --JJ
+       */
+      dataAccessor = new ZKHelixDataAccessor(clusterName, 
dataAccessor.getBaseDataAccessor());
     }
-    return totalMessageCount;
+    return dataAccessor;
   }
 
   public Map<InstanceType, List<Message>> generateMessage(final Criteria 
recipientCriteria,
@@ -151,51 +163,55 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
     Map<InstanceType, List<Message>> messagesToSendMap = new 
HashMap<InstanceType, List<Message>>();
     InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
 
-    if (instanceType == InstanceType.CONTROLLER) {
-      List<Message> messages = generateMessagesForController(message);
-      messagesToSendMap.put(InstanceType.CONTROLLER, messages);
-      // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
-      // newMessage.getRecord(), CreateMode.PERSISTENT);
-    } else if (instanceType == InstanceType.PARTICIPANT) {
-      List<Message> messages = new ArrayList<Message>();
-      List<Map<String, String>> matchedList =
-          _evaluator.evaluateCriteria(recipientCriteria, _manager);
-
-      if (!matchedList.isEmpty()) {
-        Map<String, String> sessionIdMap = new HashMap<String, String>();
-        if (recipientCriteria.isSessionSpecific()) {
-          HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-          Builder keyBuilder = accessor.keyBuilder();
+    HelixDataAccessor targetDataAccessor = 
getRecipientDataAccessor(recipientCriteria);
 
-          List<LiveInstance> liveInstances = 
accessor.getChildValues(keyBuilder.liveInstances());
+      List<Message> messages = Collections.EMPTY_LIST;
+      if (instanceType == InstanceType.CONTROLLER) {
+        messages = generateMessagesForController(message);
+      } else if (instanceType == InstanceType.PARTICIPANT) {
+        messages =
+            generateMessagesForParticipant(recipientCriteria, message, 
targetDataAccessor);
+      }
+      messagesToSendMap.put(instanceType, messages);
+      return messagesToSendMap;
+  }
 
-          for (LiveInstance liveInstance : liveInstances) {
-            sessionIdMap.put(liveInstance.getInstanceName(), 
liveInstance.getSessionId());
-          }
+  private List<Message> generateMessagesForParticipant(Criteria 
recipientCriteria, Message message,
+      HelixDataAccessor targetDataAccessor) {
+    List<Message> messages = new ArrayList<Message>();
+    List<Map<String, String>> matchedList =
+        _evaluator.evaluateCriteria(recipientCriteria, targetDataAccessor);
+
+    if (!matchedList.isEmpty()) {
+      Map<String, String> sessionIdMap = new HashMap<String, String>();
+      if (recipientCriteria.isSessionSpecific()) {
+        Builder keyBuilder = targetDataAccessor.keyBuilder();
+        List<LiveInstance> liveInstances = 
targetDataAccessor.getChildValues(keyBuilder.liveInstances());
+
+        for (LiveInstance liveInstance : liveInstances) {
+          sessionIdMap.put(liveInstance.getInstanceName(), 
liveInstance.getSessionId());
         }
-        for (Map<String, String> map : matchedList) {
-          String id = UUID.randomUUID().toString();
-          Message newMessage = new Message(message.getRecord(), id);
-          String srcInstanceName = _manager.getInstanceName();
-          String tgtInstanceName = map.get("instanceName");
-          // Don't send message to self
-          if (recipientCriteria.isSelfExcluded()
-              && srcInstanceName.equalsIgnoreCase(tgtInstanceName)) {
-            continue;
-          }
-          newMessage.setSrcName(srcInstanceName);
-          newMessage.setTgtName(tgtInstanceName);
-          newMessage.setResourceName(map.get("resourceName"));
-          newMessage.setPartitionName(map.get("partitionName"));
-          if (recipientCriteria.isSessionSpecific()) {
-            newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
-          }
-          messages.add(newMessage);
+      }
+      for (Map<String, String> map : matchedList) {
+        String id = UUID.randomUUID().toString();
+        Message newMessage = new Message(message.getRecord(), id);
+        String srcInstanceName = _manager.getInstanceName();
+        String tgtInstanceName = map.get("instanceName");
+        // Don't send message to self
+        if (recipientCriteria.isSelfExcluded() && 
srcInstanceName.equalsIgnoreCase(tgtInstanceName)) {
+          continue;
+        }
+        newMessage.setSrcName(srcInstanceName);
+        newMessage.setTgtName(tgtInstanceName);
+        newMessage.setResourceName(map.get("resourceName"));
+        newMessage.setPartitionName(map.get("partitionName"));
+        if (recipientCriteria.isSessionSpecific()) {
+          newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
         }
-        messagesToSendMap.put(InstanceType.PARTICIPANT, messages);
+        messages.add(newMessage);
       }
     }
-    return messagesToSendMap;
+    return messages;
   }
 
   private List<Message> generateMessagesForController(Message message) {
@@ -317,9 +333,10 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
   }
 
   @Override
-  public int sendAndWait(Criteria receipientCriteria, Message message, 
AsyncCallback asyncCallback,
+  // TODO if the manager is not Participant or Controller, no reply, so should 
fail immediately
+  public int sendAndWait(Criteria recipientCriteria, Message message, 
AsyncCallback asyncCallback,
       int timeOut, int retryCount) {
-    int messagesSent = send(receipientCriteria, message, asyncCallback, 
timeOut, retryCount);
+    int messagesSent = send(recipientCriteria, message, asyncCallback, 
timeOut, retryCount);
     if (messagesSent > 0) {
       synchronized (asyncCallback) {
         while (!asyncCallback.isDone() && !asyncCallback.isTimedOut()) {
@@ -333,7 +350,7 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
         }
       }
     } else {
-      _logger.warn("No messages sent. For Criteria:" + receipientCriteria);
+      _logger.warn("No messages sent. For Criteria:" + recipientCriteria);
     }
     return messagesSent;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/b91d6eee/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 0738ef9..2f3d805 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -22,6 +22,7 @@ package org.apache.helix.messaging.handling;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -31,6 +32,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
 import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
 import org.apache.helix.model.Message;
@@ -77,7 +79,6 @@ public class HelixTask implements MessageTask {
 
     long start = System.currentTimeMillis();
     logger.info("handling task: " + getTaskId() + " begin, at: " + start);
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     _statusUpdateUtil.logInfo(_message, HelixTask.class, "Message handling 
task begin execute",
         _manager);
     _message.setExecuteStartTimeStamp(new Date().getTime());
@@ -168,6 +169,7 @@ public class HelixTask implements MessageTask {
         }
       }
 
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
       // forward relay messages attached to this message to other participants
       if (taskResult.isSuccess()) {
         try {
@@ -183,7 +185,7 @@ public class HelixTask implements MessageTask {
       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
         removeMessageFromZk(accessor, _message);
         reportMessageStat(_manager, _message, taskResult);
-        sendReply(accessor, _message, taskResult);
+        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
         _executor.finishTask(this);
       }
     } catch (Exception e) {
@@ -262,9 +264,24 @@ public class HelixTask implements MessageTask {
     }
   }
 
-  private void sendReply(HelixDataAccessor accessor, Message message, 
HelixTaskResult taskResult) {
-    if (_message.getCorrelationId() != null
-        && !message.getMsgType().equals(MessageType.TASK_REPLY.name())) {
+  private HelixDataAccessor getSrcClusterDataAccessor(final Message message) {
+    HelixDataAccessor helixDataAccessor = _manager.getHelixDataAccessor();
+    String clusterName = message.getSrcClusterName();
+    if (clusterName != null && !clusterName.equals(_manager.getClusterName())) 
{
+      // for cross cluster message, create different HelixDataAccessor for 
replying message.
+      /*
+        TODO On frequent cross clsuter messaging request, keeping construct 
data accessor may cause
+        performance issue. We should consider adding cache in this class or 
HelixManager. --JJ
+       */
+      helixDataAccessor = new ZKHelixDataAccessor(clusterName, 
helixDataAccessor.getBaseDataAccessor());
+    }
+    return helixDataAccessor;
+  }
+
+  private void sendReply(HelixDataAccessor replyDataAccessor, Message message,
+      HelixTaskResult taskResult) {
+    if (message.getCorrelationId() != null && !message.getMsgType()
+        .equals(MessageType.TASK_REPLY.name())) {
       logger.info("Sending reply for message " + message.getCorrelationId());
       _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", 
_manager);
 
@@ -273,21 +290,24 @@ public class HelixTask implements MessageTask {
       if (!taskResult.isSuccess()) {
         taskResult.getTaskResultMap().put("ERRORINFO", 
taskResult.getMessage());
       }
-      Message replyMessage =
-          Message.createReplyMessage(_message, _manager.getInstanceName(),
-              taskResult.getTaskResultMap());
+      Message replyMessage = Message
+          .createReplyMessage(message, _manager.getInstanceName(), 
taskResult.getTaskResultMap());
       replyMessage.setSrcInstanceType(_manager.getInstanceType());
 
+      Builder keyBuilder = replyDataAccessor.keyBuilder();
       if (message.getSrcInstanceType() == InstanceType.PARTICIPANT) {
-        Builder keyBuilder = accessor.keyBuilder();
-        accessor.setProperty(keyBuilder.message(message.getMsgSrc(), 
replyMessage.getMsgId()),
-            replyMessage);
+        replyDataAccessor
+            .setProperty(keyBuilder.message(message.getMsgSrc(), 
replyMessage.getMsgId()),
+                replyMessage);
       } else if (message.getSrcInstanceType() == InstanceType.CONTROLLER) {
-        Builder keyBuilder = accessor.keyBuilder();
-        
accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()), 
replyMessage);
+        replyDataAccessor
+            
.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()), 
replyMessage);
       }
-      _statusUpdateUtil.logInfo(message, HelixTask.class,
-          "1 msg replied to " + replyMessage.getTgtName(), _manager);
+      _statusUpdateUtil.logInfo(message, HelixTask.class, String
+          .format("1 msg replied to %s in cluster %s.", 
replyMessage.getTgtName(),
+              message.getSrcClusterName() == null ?
+                  _manager.getClusterName() :
+                  message.getSrcClusterName()), _manager);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b91d6eee/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java 
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 51d03cb..e619b2b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -96,7 +96,8 @@ public class Message extends HelixProperty {
     RELAY_PARTICIPANTS,
     RELAY_TIME,
     RELAY_FROM,
-    EXPIRY_PERIOD
+    EXPIRY_PERIOD,
+    SRC_CLUSTER
   }
 
   /**
@@ -858,6 +859,22 @@ public class Message extends HelixProperty {
   }
 
   /**
+   * Get the source cluster name
+   * @return the source cluster from where the message was sent or null if the 
message was sent locally
+   */
+  public String getSrcClusterName() {
+    return _record.getStringField(Attributes.SRC_CLUSTER.name(), null);
+  }
+
+  /**
+   * Set the source cluster name
+   * @param clusterName source cluster name where message was sent from
+   */
+  public void setSrcClusterName(String clusterName) {
+    _record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
+  }
+
+  /**
    * Check if this message is targetted for a controller
    * @return true if this is a controller message, false otherwise
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/b91d6eee/helix-core/src/test/java/org/apache/helix/integration/messaging/TestCrossClusterMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestCrossClusterMessagingService.java
 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestCrossClusterMessagingService.java
new file mode 100644
index 0000000..fe38c6d
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestCrossClusterMessagingService.java
@@ -0,0 +1,333 @@
+package org.apache.helix.integration.messaging;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.Criteria.DataSource;
+import org.apache.helix.InstanceType;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestCrossClusterMessagingService extends TestMessagingService {
+  private final String ADMIN_CLUSTER_NAME = "ADMIN_" + CLUSTER_NAME;
+  private ClusterControllerManager _adminController;
+  private String _hostSrc;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+
+    // setup the admin cluster for sending cross cluster messages
+    _gSetupTool.addCluster(ADMIN_CLUSTER_NAME, true);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_1";
+    _hostSrc = controllerName;
+    _adminController = new ClusterControllerManager(ZK_ADDR, 
ADMIN_CLUSTER_NAME, controllerName);
+    _adminController.syncStart();
+
+    _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(ADMIN_CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    if (_adminController != null && _adminController.isConnected()) {
+      _adminController.syncStop();
+    }
+
+    deleteCluster(ADMIN_CLUSTER_NAME);
+    super.afterClass();
+  }
+
+  @Test()
+  public void TestMessageSimpleSend() throws Exception {
+    String hostDest = "localhost_" + (START_PORT + 1);
+
+    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+    _participants[1].getMessagingService()
+        .registerMessageHandlerFactory(factory.getMessageTypes(), factory);
+
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(factory.getMessageTypes().get(0), msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(_hostSrc);
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setClusterName(CLUSTER_NAME);
+
+    int nMsgs = _adminController.getMessagingService().send(cr, msg);
+    AssertJUnit.assertTrue(nMsgs == 1);
+    Thread.sleep(2500);
+    
AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds.contains(para));
+
+    cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setDataSource(DataSource.IDEALSTATES);
+    cr.setClusterName(CLUSTER_NAME);
+
+    // nMsgs = 
_startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    nMsgs = _adminController.getMessagingService().send(cr, msg);
+    AssertJUnit.assertTrue(nMsgs == 1);
+    Thread.sleep(2500);
+    
AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds.contains(para));
+  }
+
+  @Test()
+  public void TestMessageSimpleSendReceiveAsync() throws Exception {
+    String hostDest = "localhost_" + (START_PORT + 1);
+
+    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+    _participants[1].getMessagingService()
+        .registerMessageHandlerFactory(factory.getMessageTypes(), factory);
+    _participants[0].getMessagingService()
+        .registerMessageHandlerFactory(factory.getMessageTypes(), factory);
+
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(factory.getMessageTypes().get(0), msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(_hostSrc);
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setClusterName(CLUSTER_NAME);
+
+    TestAsyncCallback callback = new TestAsyncCallback(60000);
+
+    _adminController.getMessagingService().send(cr, msg, callback, 60000);
+
+    Thread.sleep(2000);
+    
AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents.contains("TestReplyMessage"));
+    AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
+
+    TestAsyncCallback callback2 = new TestAsyncCallback(500);
+    _adminController.getMessagingService().send(cr, msg, callback2, 500);
+
+    Thread.sleep(3000);
+    AssertJUnit.assertTrue(callback2.isTimedOut());
+
+    cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setDataSource(DataSource.IDEALSTATES);
+    cr.setClusterName(CLUSTER_NAME);
+
+    callback = new TestAsyncCallback(60000);
+
+    _adminController.getMessagingService().send(cr, msg, callback, 60000);
+
+    Thread.sleep(2000);
+    
AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents.contains("TestReplyMessage"));
+    AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
+
+    callback2 = new TestAsyncCallback(500);
+    _adminController.getMessagingService().send(cr, msg, callback2, 500);
+
+    Thread.sleep(3000);
+    AssertJUnit.assertTrue(callback2.isTimedOut());
+  }
+
+  @Test()
+  public void TestBlockingSendReceive() {
+    String hostDest = "localhost_" + (START_PORT + 1);
+
+    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+    _participants[1].getMessagingService()
+        .registerMessageHandlerFactory(factory.getMessageTypes(), factory);
+
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(factory.getMessageTypes().get(0), msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(_hostSrc);
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setClusterName(CLUSTER_NAME);
+
+    AsyncCallback asyncCallback = new MockAsyncCallback();
+    int messagesSent =
+        _adminController.getMessagingService().sendAndWait(cr, msg, 
asyncCallback, 60000);
+
+    AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
+        
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
+        .equals("TestReplyMessage"));
+    AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
+
+    AsyncCallback asyncCallback2 = new MockAsyncCallback();
+    messagesSent = _adminController.getMessagingService().sendAndWait(cr, msg, 
asyncCallback2, 500);
+    AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
+  }
+
+  @Test()
+  public void TestMultiMessageCriteria() throws Exception {
+    for (int i = 0; i < NODE_NR; i++) {
+      TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+      _participants[i].getMessagingService()
+          .registerMessageHandlerFactory(factory.getMessageTypes(), factory);
+    }
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(new 
TestMessagingHandlerFactory().getMessageTypes().get(0), msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(_hostSrc);
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName("%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setClusterName(CLUSTER_NAME);
+
+    AsyncCallback callback1 = new MockAsyncCallback();
+    int messageSent1 =
+        _adminController.getMessagingService().sendAndWait(cr, msg, callback1, 
10000);
+
+    AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
+        
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
+        .equals("TestReplyMessage"));
+    AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
+
+    AsyncCallback callback2 = new MockAsyncCallback();
+    int messageSent2 = _adminController.getMessagingService().sendAndWait(cr, 
msg, callback2, 500);
+
+    AssertJUnit.assertTrue(callback2.isTimedOut());
+
+    cr.setPartition("TestDB_17");
+    AsyncCallback callback3 = new MockAsyncCallback();
+    int messageSent3 =
+        _adminController.getMessagingService().sendAndWait(cr, msg, callback3, 
10000);
+    AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica);
+
+    cr.setPartition("TestDB_15");
+    AsyncCallback callback4 = new MockAsyncCallback();
+    int messageSent4 =
+        _adminController.getMessagingService().sendAndWait(cr, msg, callback4, 
10000);
+    AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
+
+    cr.setPartitionState("SLAVE");
+    AsyncCallback callback5 = new MockAsyncCallback();
+    int messageSent5 =
+        _adminController.getMessagingService().sendAndWait(cr, msg, callback5, 
10000);
+    AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 
1);
+
+    cr.setDataSource(DataSource.IDEALSTATES);
+    AsyncCallback callback6 = new MockAsyncCallback();
+    int messageSent6 =
+        _adminController.getMessagingService().sendAndWait(cr, msg, callback6, 
10000);
+    AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 
1);
+  }
+
+  @Test()
+  public void TestControllerMessage() {
+    for (int i = 0; i < NODE_NR; i++) {
+      TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+      _participants[i].getMessagingService()
+          .registerMessageHandlerFactory(factory.getMessageTypes(), factory);
+
+    }
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(MessageType.CONTROLLER_MSG, msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(_hostSrc);
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName("*");
+    cr.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr.setSessionSpecific(false);
+    cr.setClusterName(CLUSTER_NAME);
+
+    AsyncCallback callback1 = new MockAsyncCallback();
+    int messagesSent =
+        _adminController.getMessagingService().sendAndWait(cr, msg, callback1, 
10000);
+
+    AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
+        
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
+        .indexOf(_hostSrc) != -1);
+    AssertJUnit.assertTrue(callback1.getMessageReplied().size() == 1);
+
+    msgId = UUID.randomUUID().toString();
+    msg.setMsgId(msgId);
+    cr.setPartition("TestDB_17");
+    AsyncCallback callback2 = new MockAsyncCallback();
+    messagesSent = _adminController.getMessagingService().sendAndWait(cr, msg, 
callback2, 10000);
+
+    AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
+        
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
+        .indexOf(_hostSrc) != -1);
+
+    AssertJUnit.assertTrue(callback2.getMessageReplied().size() == 1);
+
+    msgId = UUID.randomUUID().toString();
+    msg.setMsgId(msgId);
+    cr.setPartitionState("SLAVE");
+    AsyncCallback callback3 = new MockAsyncCallback();
+    messagesSent = _adminController.getMessagingService().sendAndWait(cr, msg, 
callback3, 10000);
+    AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
+        
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
+        .indexOf(_hostSrc) != -1);
+
+    AssertJUnit.assertTrue(callback3.getMessageReplied().size() == 1);
+  }
+
+  @Test(enabled = false)
+  public void sendSelfMsg() {
+    // Override the test defined in parent class.
+  }
+}

Reply via email to