This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 97cb118fd Refactor updating and searching for threadpool for state 
transition messages (#2380)
97cb118fd is described below

commit 97cb118fd6cfcaf3a50a304c790486f646897cda
Author: Molly Gao <[email protected]>
AuthorDate: Thu Mar 2 13:40:53 2023 -0800

    Refactor updating and searching for threadpool for state transition 
messages (#2380)
    
    *Refactor updating and searching for threadpool for state transition 
messages
---
 .../messaging/handling/HelixTaskExecutor.java      |  71 ++++++---------
 .../main/java/org/apache/helix/model/Message.java  |  39 ++++++++
 .../messaging/handling/TestHelixTaskExecutor.java  | 100 +++++++++++++++++++++
 3 files changed, 167 insertions(+), 43 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index b21a065a9..4a96f111a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -289,21 +289,20 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     StateModelFactory<? extends StateModel> stateModelFactory =
         manager.getStateMachineEngine().getStateModelFactory(stateModelName, 
factoryName);
 
+    Message.MessageInfo msgInfo = new Message.MessageInfo(message);
     String perStateTransitionTypeKey =
-        
getStateTransitionType(getPerResourceStateTransitionPoolName(resourceName),
-            message.getFromState(), message.getToState());
-    if (perStateTransitionTypeKey != null && stateModelFactory != null
-        && 
!_transitionTypeThreadpoolChecked.contains(perStateTransitionTypeKey)) {
-      ExecutorService perStateTransitionTypeExecutor = stateModelFactory
-          .getExecutorService(resourceName, message.getFromState(), 
message.getToState());
+        
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
+    if (perStateTransitionTypeKey != null && stateModelFactory != null && 
!_transitionTypeThreadpoolChecked.contains(
+        perStateTransitionTypeKey)) {
+      ExecutorService perStateTransitionTypeExecutor =
+          stateModelFactory.getExecutorService(resourceName, 
message.getFromState(), message.getToState());
       _transitionTypeThreadpoolChecked.add(perStateTransitionTypeKey);
 
       if (perStateTransitionTypeExecutor != null) {
         _executorMap.put(perStateTransitionTypeKey, 
perStateTransitionTypeExecutor);
-        LOG.info(String
-            .format("Added client specified dedicate threadpool for resource 
%s from %s to %s",
-                getPerResourceStateTransitionPoolName(resourceName), 
message.getFromState(),
-                message.getToState()));
+        LOG.info(String.format("Added client specified dedicate threadpool for 
resource %s from %s to %s",
+            
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE),
+            message.getFromState(), message.getToState()));
         return;
       }
     }
@@ -313,8 +312,10 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       ConfigAccessor configAccessor = manager.getConfigAccessor();
       // Changes to this configuration on thread pool size will only take 
effect after the participant get restarted.
       if (configAccessor != null) {
-        HelixConfigScope scope = new 
HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
-            
.forCluster(manager.getClusterName()).forResource(resourceName).build();
+        HelixConfigScope scope =
+            new 
HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(manager.getClusterName())
+                .forResource(resourceName)
+                .build();
 
         String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
         try {
@@ -322,16 +323,14 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
             threadpoolSize = Integer.parseInt(threadpoolSizeStr);
           }
         } catch (Exception e) {
-          LOG.error(
-              "Failed to parse ThreadPoolSize from resourceConfig for 
resource" + resourceName, e);
+          LOG.error("Failed to parse ThreadPoolSize from resourceConfig for 
resource" + resourceName, e);
         }
       }
-      final String key = getPerResourceStateTransitionPoolName(resourceName);
+      final String key = 
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
       if (threadpoolSize > 0) {
         _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize,
             r -> new Thread(r, "GerenricHelixController-message_handle_" + 
key)));
-        LOG.info("Added dedicate threadpool for resource: " + resourceName + " 
with size: "
-            + threadpoolSize);
+        LOG.info("Added dedicate threadpool for resource: " + resourceName + " 
with size: " + threadpoolSize);
       } else {
         // if threadpool is not configured
         // check whether client specifies customized threadpool.
@@ -361,21 +360,18 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
       if (message.getBatchMessageMode() == true) {
         executorService = _batchMessageExecutorService;
       } else {
-        String resourceName = message.getResourceName();
-        if (resourceName != null) {
-          String key = getPerResourceStateTransitionPoolName(resourceName);
-          String perStateTransitionTypeKey =
-              getStateTransitionType(key, message.getFromState(), 
message.getToState());
-          if (perStateTransitionTypeKey != null && _executorMap
-              .containsKey(perStateTransitionTypeKey)) {
-            LOG.info(String
-                .format("Find per state transition type thread pool for 
resource %s from %s to %s",
-                    message.getResourceName(), message.getFromState(), 
message.getToState()));
-            executorService = _executorMap.get(perStateTransitionTypeKey);
-          } else if (_executorMap.containsKey(key)) {
-            LOG.info("Find per-resource thread pool with key: " + key);
-            executorService = _executorMap.get(key);
-          }
+        Message.MessageInfo msgInfo = new Message.MessageInfo(message);
+        String perResourceTypeKey =
+            
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
+        String perStateTransitionTypeKey =
+            
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
+        if (perStateTransitionTypeKey != null && 
_executorMap.containsKey(perStateTransitionTypeKey)) {
+          LOG.info(String.format("Find per state transition type thread pool 
for resource %s from %s to %s",
+              message.getResourceName(), message.getFromState(), 
message.getToState()));
+          executorService = _executorMap.get(perStateTransitionTypeKey);
+        } else if (_executorMap.containsKey(perResourceTypeKey)) {
+          LOG.info("Find per-resource thread pool with key: " + 
perResourceTypeKey);
+          executorService = _executorMap.get(perResourceTypeKey);
         }
       }
     }
@@ -1432,17 +1428,6 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
         instanceName, _liveInstanceStatus, _freezeSessionId, success);
   }
 
-  private String getStateTransitionType(String prefix, String fromState, 
String toState) {
-    if (prefix == null || fromState == null || toState == null) {
-      return null;
-    }
-    return String.format("%s.%s.%s", prefix, fromState, toState);
-  }
-
-  private String getPerResourceStateTransitionPoolName(String resourceName) {
-    return MessageType.STATE_TRANSITION.name() + "." + resourceName;
-  }
-
   public LiveInstanceStatus getLiveInstanceStatus() {
     return _liveInstanceStatus;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java 
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 6222df521..b3a7c8884 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -966,4 +966,43 @@ public class Message extends HelixProperty {
     }
     return true;
   }
+
+  /**
+   * This class is for categorizing state transition messages based on certain 
properties, and generating
+   */
+  public static class MessageInfo {
+    public String _msgType;
+    public String _resourceName;
+    public String _fromState;
+    public String _toState;
+
+    public enum MessageIdentifierBase {
+      PER_RESOURCE,
+      PER_STATE_TRANSITION_TYPE
+    }
+
+    public MessageInfo(Message message) {
+      _msgType = message.getMsgType();
+      _resourceName = message.getResourceName();
+      _fromState = message.getFromState();
+      _toState = message.getToState();
+    }
+
+    public String getMessageIdentifier(MessageIdentifierBase basis) {
+      String delimiter = ".";
+      if (basis == null || _msgType == null || _resourceName == null) {
+        return null;
+      }
+      String identifier = String.join(delimiter, _msgType, _resourceName);
+      switch (basis) {
+        case PER_STATE_TRANSITION_TYPE:
+          if (_fromState == null || _toState == null) {
+            return null;
+          }
+          identifier = String.join(delimiter, identifier, _fromState, 
_toState);
+          break;
+      }
+      return identifier;
+    }
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 1a8cc5ac4..d4b554978 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -32,26 +32,41 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 
 import com.google.common.collect.ImmutableList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.MockAccessor;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.MockClusterMessagingService;
 import org.apache.helix.mock.MockManager;
+import org.apache.helix.mock.statemodel.MockMasterSlaveStateModel;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.*;
+
+
 public class TestHelixTaskExecutor {
   @BeforeClass
   public void beforeClass() {
@@ -1104,4 +1119,89 @@ public class TestHelixTaskExecutor {
         HelixDefinedState.ERROR.toString());
     System.out.println("END " + TestHelper.getTestMethodName());
   }
+
+  @Test
+  public void testUpdateAndFindMessageThreadpool() throws Exception {
+    // Using ThreadPoolExecutor interface because it allows counting task 
number
+    ThreadPoolExecutor executor0 =
+        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<Runnable>());
+    ThreadPoolExecutor executor1 =
+        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<Runnable>());
+
+    class MockStateModelFactory_ResourceName
+        extends 
StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> {
+      @Override
+      public ExecutorService getExecutorService(String resourceName) {
+        return executor0;
+      }
+    }
+
+    class MockStateModelFactory_STType
+        extends 
StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> {
+      @Override
+      public ExecutorService getExecutorService(String resourceName, String 
fromState, String toState) {
+        return executor1;
+      }
+    }
+
+    System.out.println("START " + TestHelper.getTestMethodName());
+    String sessionId = UUID.randomUUID().toString();
+    String resourceName = "testDB";
+    String msgId = "testMsgId";
+    String fromState = "Offline";
+    String toState = "Online";
+    String stateModelDef = "OnlineOffline";
+    HelixManager manager = mock(ZKHelixManager.class);
+    StateMachineEngine engine = mock(HelixStateMachineEngine.class);
+    when(manager.getStateMachineEngine()).thenReturn(engine);
+    when(manager.getInstanceType()).thenReturn(InstanceType.PARTICIPANT);
+    when(manager.getHelixDataAccessor()).thenReturn(new MockAccessor());
+    when(manager.getSessionId()).thenReturn(sessionId);
+    when(manager.getInstanceName()).thenReturn("TestInstance");
+    when(manager.getMessagingService()).thenReturn(new 
MockClusterMessagingService());
+    when(manager.getClusterName()).thenReturn(TestHelper.getTestMethodName());
+    StateModel stateModel = new MockMasterSlaveStateModel();
+    NotificationContext context = new NotificationContext(manager);
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    Message message = new Message(Message.MessageType.STATE_TRANSITION, msgId);
+    message.setFromState(fromState);
+    message.setToState(toState);
+    message.setResourceName(resourceName);
+    message.setStateModelDef(stateModelDef);
+    message.setPartitionName("TestPartition");
+    message.setTgtName("TgtInstance");
+    message.setStateModelFactoryName("DEFAULT");
+    message.setTgtSessionId(sessionId);
+
+    // State transition type based
+    executor =
+        new HelixTaskExecutor(); // Re-initialize it because if the message 
exists in _taskMap, it won't be assigned again
+    StateModelFactory<? extends StateModel> factory = new 
MockStateModelFactory_STType();
+    Mockito.doReturn(factory)
+        .when(engine)
+        .getStateModelFactory(stateModelDef, 
HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+    HelixStateTransitionHandler handler = new 
HelixStateTransitionHandler(factory, stateModel, message, context, new 
CurrentState(resourceName));
+    HelixTask task = new HelixTask(message, context, handler, executor);
+    executor.scheduleTask(task);
+    Assert.assertTrue(TestHelper.verify(() -> {
+      return executor1.getTaskCount() == 1;
+    }, TestHelper.WAIT_DURATION));
+    System.out.println(TestHelper.getTestMethodName() + ": State transition 
based test passed.");
+
+    // Resource name based
+    executor = new HelixTaskExecutor();
+    factory = new MockStateModelFactory_ResourceName();
+    Mockito.doReturn(factory)
+        .when(engine)
+        .getStateModelFactory(stateModelDef, 
HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+    handler = new HelixStateTransitionHandler(factory, stateModel, message, 
context, new CurrentState(resourceName));
+    engine.registerStateModelFactory(stateModelDef, factory);
+    task = new HelixTask(message, context, handler, executor);
+    executor.scheduleTask(task);
+    Assert.assertTrue(TestHelper.verify(() -> {
+      return executor0.getTaskCount() == 1;
+    }, TestHelper.WAIT_DURATION));
+    System.out.println(TestHelper.getTestMethodName() + ": Resource name based 
test passed.");
+    System.out.println("END " + TestHelper.getTestMethodName());
+  }
 }

Reply via email to