This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 25117d9d0b98b048d7955655c47fa89837bbe361
Author: Huizhi Lu <[email protected]>
AuthorDate: Mon Jun 14 21:32:25 2021 -0700

    Add message util to create messages (#1796)
    
    Message creation methods are private in message generation phase. 
Management mode stage will also need message generation methods to create ST 
cancellation and participant status change messages.
    Message util will help with the purposes.
    
    This commit moves the common message creation logic to a message util so 
multiple stages can reuse the code.
---
 .../controller/stages/MessageGenerationPhase.java  |  97 +++-----------
 .../java/org/apache/helix/util/MessageUtil.java    | 139 +++++++++++++++++++++
 .../TestStateTransitionAppFailureHandling.java     |   5 +-
 3 files changed, 157 insertions(+), 84 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 836e5df..38d9908 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
@@ -45,13 +44,12 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.MessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,8 +68,6 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
       
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 
* 1000);
   private final static String PENDING_MESSAGE = "pending message";
   private final static String STALE_MESSAGE = "stale message";
-  // TODO: Make the message retry count configurable through the Cluster 
Config or IdealStates.
-  public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
 
   private static Logger logger = 
LoggerFactory.getLogger(MessageGenerationPhase.class);
 
@@ -215,10 +211,12 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
         if (desiredState.equals(NO_DESIRED_STATE) || 
desiredState.equalsIgnoreCase(currentState)) {
           if (shouldCreateSTCancellation(pendingMessage, desiredState,
               stateModelDef.getInitialState())) {
-            message = createStateTransitionCancellationMessage(manager, 
resource,
-                partition.getPartitionName(), instanceName, 
sessionIdMap.get(instanceName),
-                stateModelDef.getId(), pendingMessage.getFromState(), 
pendingMessage.getToState(),
-                null, cancellationMessage, isCancellationEnabled, 
currentState);
+            message = MessageUtil
+                
.createStateTransitionCancellationMessage(manager.getInstanceName(),
+                    manager.getSessionId(), resource, 
partition.getPartitionName(), instanceName,
+                    sessionIdMap.get(instanceName), stateModelDef.getId(),
+                    pendingMessage.getFromState(), 
pendingMessage.getToState(), null,
+                    cancellationMessage, isCancellationEnabled, currentState);
           }
         } else {
           if (nextState == null) {
@@ -236,9 +234,10 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
                     cancellationMessage, isCancellationEnabled);
           } else {
             // Create new state transition message
-            message = createStateTransitionMessage(manager, resource, 
partition.getPartitionName(),
-                instanceName, currentState, nextState, 
sessionIdMap.get(instanceName),
-                stateModelDef.getId());
+            message = MessageUtil
+                .createStateTransitionMessage(manager.getInstanceName(), 
manager.getSessionId(),
+                    resource, partition.getPartitionName(), instanceName, 
currentState, nextState,
+                    sessionIdMap.get(instanceName), stateModelDef.getId());
 
             if (logger.isDebugEnabled()) {
               LogUtil.logDebug(logger, _eventId, String.format(
@@ -331,10 +330,10 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
                 + instanceName + ", pendingState: " + pendingState + ", 
currentState: "
                 + currentState + ", nextState: " + nextState + ", isRelay: " + 
pendingMessage.isRelayMessage());
 
-        message = createStateTransitionCancellationMessage(manager, resource,
-            partition.getPartitionName(), instanceName, 
sessionIdMap.get(instanceName),
-            stateModelDef.getId(), pendingMessage.getFromState(), 
pendingState, nextState,
-            cancellationMessage, isCancellationEnabled, currentState);
+        message = 
MessageUtil.createStateTransitionCancellationMessage(manager.getInstanceName(),
+            manager.getSessionId(), resource, partition.getPartitionName(), 
instanceName,
+            sessionIdMap.get(instanceName), stateModelDef.getId(), 
pendingMessage.getFromState(),
+            pendingState, nextState, cancellationMessage, 
isCancellationEnabled, currentState);
       }
     }
     return message;
@@ -417,72 +416,6 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
     }
   }
 
-  private Message createStateTransitionMessage(HelixManager manager, Resource 
resource,
-      String partitionName, String instanceName, String currentState, String 
nextState,
-      String sessionId, String stateModelDefName) {
-    String uuid = UUID.randomUUID().toString();
-    String managerSessionId = manager.getSessionId();
-    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
-    message.setSrcName(manager.getInstanceName());
-    message.setTgtName(instanceName);
-    message.setMsgState(MessageState.NEW);
-    message.setPartitionName(partitionName);
-    message.setResourceName(resource.getResourceName());
-    message.setFromState(currentState);
-    message.setToState(nextState);
-    message.setTgtSessionId(sessionId);
-    message.setSrcSessionId(managerSessionId);
-    message.setExpectedSessionId(managerSessionId);
-    message.setStateModelDef(stateModelDefName);
-    message.setStateModelFactoryName(resource.getStateModelFactoryname());
-    message.setBucketSize(resource.getBucketSize());
-    // Set the retry count for state transition messages.
-    // TODO: make the retry count configurable in ClusterConfig or IdealState
-    message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
-
-    if (resource.getResourceGroupName() != null) {
-      message.setResourceGroupName(resource.getResourceGroupName());
-    }
-    if (resource.getResourceTag() != null) {
-      message.setResourceTag(resource.getResourceTag());
-    }
-
-    return message;
-  }
-
-  private Message createStateTransitionCancellationMessage(HelixManager 
manager, Resource resource,
-      String partitionName, String instanceName, String sessionId, String 
stateModelDefName,
-      String fromState, String toState, String nextState, Message 
cancellationMessage,
-      boolean isCancellationEnabled, String currentState) {
-
-    if (isCancellationEnabled && cancellationMessage == null) {
-      logger.info("Event {} : Send cancellation message of the state 
transition for {}.{} on {}, "
-              + "currentState: {}, nextState: {},  toState: {}",
-          _eventId, resource.getResourceName(), partitionName, instanceName,
-          currentState, nextState == null ? "N/A" : nextState, toState);
-
-      String uuid = UUID.randomUUID().toString();
-      String managerSessionId = manager.getSessionId();
-      Message message = new Message(MessageType.STATE_TRANSITION_CANCELLATION, 
uuid);
-      message.setSrcName(manager.getInstanceName());
-      message.setTgtName(instanceName);
-      message.setMsgState(MessageState.NEW);
-      message.setPartitionName(partitionName);
-      message.setResourceName(resource.getResourceName());
-      message.setFromState(fromState);
-      message.setToState(toState);
-      message.setTgtSessionId(sessionId);
-      message.setSrcSessionId(managerSessionId);
-      message.setExpectedSessionId(managerSessionId);
-      message.setStateModelDef(stateModelDefName);
-      message.setStateModelFactoryName(resource.getStateModelFactoryname());
-      message.setBucketSize(resource.getBucketSize());
-      return message;
-    }
-
-    return null;
-  }
-
   private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig 
resourceConfig,
       String currentState, String nextState, IdealState idealState, Partition 
partition) {
     StateTransitionTimeoutConfig stateTransitionTimeoutConfig =
diff --git a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
new file mode 100644
index 0000000..94de833
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
@@ -0,0 +1,139 @@
+package org.apache.helix.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.UUID;
+
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Message utils to operate on message such creating messages.
+ */
+public class MessageUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(MessageUtil.class);
+
+  // TODO: Make the message retry count configurable through the Cluster 
Config or IdealStates.
+  public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
+
+  public static Message createStateTransitionCancellationMessage(String 
srcInstanceName,
+      String srcSessionId, Resource resource, String partitionName, String 
instanceName,
+      String sessionId, String stateModelDefName, String fromState, String 
toState,
+      String nextState, Message cancellationMessage, boolean 
isCancellationEnabled,
+      String currentState) {
+    if (isCancellationEnabled && cancellationMessage == null) {
+      LOG.info("Create cancellation message of the state transition for {}.{} 
on {}, "
+              + "currentState: {}, nextState: {},  toState: {}", 
resource.getResourceName(),
+          partitionName, instanceName, currentState, nextState == null ? "N/A" 
: nextState,
+          toState);
+
+      Message message =
+          
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
+              srcInstanceName, srcSessionId, resource, partitionName, 
instanceName, currentState,
+              nextState, sessionId, stateModelDefName);
+
+      message.setFromState(fromState);
+      message.setToState(toState);
+      return message;
+    }
+
+    return null;
+  }
+
+  public static Message createStateTransitionMessage(String srcInstanceName, 
String srcSessionId,
+      Resource resource, String partitionName, String instanceName, String 
currentState,
+      String nextState, String tgtSessionId, String stateModelDefName) {
+    Message message =
+        createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, 
srcInstanceName,
+            srcSessionId, resource, partitionName, instanceName, currentState, 
nextState, tgtSessionId,
+            stateModelDefName);
+
+    // Set the retry count for state transition messages.
+    // TODO: make the retry count configurable in ClusterConfig or IdealState
+    message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+
+    if (resource.getResourceGroupName() != null) {
+      message.setResourceGroupName(resource.getResourceGroupName());
+    }
+    if (resource.getResourceTag() != null) {
+      message.setResourceTag(resource.getResourceTag());
+    }
+
+    return message;
+  }
+
+  /**
+   * Creates a message to change participant status
+   * {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
+   *
+   * @param currentState current status of the live instance
+   * @param nextState next status that will be changed to
+   * @param srcInstanceName source instance name
+   * @param srcSessionId session id for the source instance
+   * @param tgtInstanceName target instance name
+   * @param tgtSessionId target instance session id
+   * @return participant status change message
+   */
+  public static Message 
createStatusChangeMessage(LiveInstance.LiveInstanceStatus currentState,
+      LiveInstance.LiveInstanceStatus nextState, String srcInstanceName, 
String srcSessionId,
+      String tgtInstanceName, String tgtSessionId) {
+    return createBasicMessage(Message.MessageType.PARTICIPANT_STATUS_CHANGE, 
srcInstanceName,
+        srcSessionId, tgtInstanceName, tgtSessionId, currentState.name(), 
nextState.name());
+  }
+
+  /* Creates a message that that has the least required fields. */
+  private static Message createBasicMessage(Message.MessageType messageType, 
String srcInstanceName,
+      String srcSessionId, String tgtInstanceName, String tgtSessionId, String 
currentState,
+      String nextState) {
+    String uuid = UUID.randomUUID().toString();
+
+    Message message = new Message(messageType, uuid);
+    message.setSrcName(srcInstanceName);
+    message.setTgtName(tgtInstanceName);
+    message.setMsgState(Message.MessageState.NEW);
+    message.setFromState(currentState);
+    message.setToState(nextState);
+    message.setTgtSessionId(tgtSessionId);
+    message.setSrcSessionId(srcSessionId);
+    message.setExpectedSessionId(srcSessionId);
+
+    return message;
+  }
+
+  /* Creates state transition or state transition cancellation message */
+  private static Message createStateTransitionMessage(Message.MessageType 
messageType,
+      String srcInstanceName, String srcSessionId, Resource resource, String 
partitionName,
+      String instanceName, String currentState, String nextState, String 
tgtSessionId,
+      String stateModelDefName) {
+    Message message =
+        createBasicMessage(messageType, srcInstanceName, srcSessionId, 
instanceName, tgtSessionId,
+            currentState, nextState);
+    message.setPartitionName(partitionName);
+    message.setStateModelDef(stateModelDefName);
+    message.setResourceName(resource.getResourceName());
+    message.setStateModelFactoryName(resource.getStateModelFactoryname());
+    message.setBucketSize(resource.getBucketSize());
+
+    return message;
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
index 12ede56..e97424c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
@@ -37,6 +37,7 @@ import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.MessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -122,7 +123,7 @@ public class TestStateTransitionAppFailureHandling extends 
ZkStandAloneCMTestBas
       // Check if the factory has tried enough times before fail the message.
       Assert.assertEquals(retryCountUntilSucceed - 
retryFactoryMap.get(instanceName)
           .getRemainingRetryCountUntilSucceed(), instanceMessages.size()
-          * 
MessageGenerationPhase.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+          * MessageUtil.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
     }
 
     // Verify that the partition is not initialized.
@@ -146,7 +147,7 @@ public class TestStateTransitionAppFailureHandling extends 
ZkStandAloneCMTestBas
     // Make the mock StateModelFactory return handler before last retry. So it 
will successfully
     // finish handler initialization.
     int retryCountUntilSucceed =
-        MessageGenerationPhase.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT - 
1;
+        MessageUtil.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT - 1;
     Map<String, RetryStateModelFactory> retryFactoryMap = 
resetParticipants(retryCountUntilSucceed);
 
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, 
STATE_MODEL);

Reply via email to