This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 97cb118fd Refactor updating and searching for threadpool for state
transition messages (#2380)
97cb118fd is described below
commit 97cb118fd6cfcaf3a50a304c790486f646897cda
Author: Molly Gao <[email protected]>
AuthorDate: Thu Mar 2 13:40:53 2023 -0800
Refactor updating and searching for threadpool for state transition
messages (#2380)
*Refactor updating and searching for threadpool for state transition
messages
---
.../messaging/handling/HelixTaskExecutor.java | 71 ++++++---------
.../main/java/org/apache/helix/model/Message.java | 39 ++++++++
.../messaging/handling/TestHelixTaskExecutor.java | 100 +++++++++++++++++++++
3 files changed, 167 insertions(+), 43 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index b21a065a9..4a96f111a 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -289,21 +289,20 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
StateModelFactory<? extends StateModel> stateModelFactory =
manager.getStateMachineEngine().getStateModelFactory(stateModelName,
factoryName);
+ Message.MessageInfo msgInfo = new Message.MessageInfo(message);
String perStateTransitionTypeKey =
-
getStateTransitionType(getPerResourceStateTransitionPoolName(resourceName),
- message.getFromState(), message.getToState());
- if (perStateTransitionTypeKey != null && stateModelFactory != null
- &&
!_transitionTypeThreadpoolChecked.contains(perStateTransitionTypeKey)) {
- ExecutorService perStateTransitionTypeExecutor = stateModelFactory
- .getExecutorService(resourceName, message.getFromState(),
message.getToState());
+
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
+ if (perStateTransitionTypeKey != null && stateModelFactory != null &&
!_transitionTypeThreadpoolChecked.contains(
+ perStateTransitionTypeKey)) {
+ ExecutorService perStateTransitionTypeExecutor =
+ stateModelFactory.getExecutorService(resourceName,
message.getFromState(), message.getToState());
_transitionTypeThreadpoolChecked.add(perStateTransitionTypeKey);
if (perStateTransitionTypeExecutor != null) {
_executorMap.put(perStateTransitionTypeKey,
perStateTransitionTypeExecutor);
- LOG.info(String
- .format("Added client specified dedicate threadpool for resource
%s from %s to %s",
- getPerResourceStateTransitionPoolName(resourceName),
message.getFromState(),
- message.getToState()));
+ LOG.info(String.format("Added client specified dedicate threadpool for
resource %s from %s to %s",
+
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE),
+ message.getFromState(), message.getToState()));
return;
}
}
@@ -313,8 +312,10 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
ConfigAccessor configAccessor = manager.getConfigAccessor();
// Changes to this configuration on thread pool size will only take
effect after the participant get restarted.
if (configAccessor != null) {
- HelixConfigScope scope = new
HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
-
.forCluster(manager.getClusterName()).forResource(resourceName).build();
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(manager.getClusterName())
+ .forResource(resourceName)
+ .build();
String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
try {
@@ -322,16 +323,14 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
threadpoolSize = Integer.parseInt(threadpoolSizeStr);
}
} catch (Exception e) {
- LOG.error(
- "Failed to parse ThreadPoolSize from resourceConfig for
resource" + resourceName, e);
+ LOG.error("Failed to parse ThreadPoolSize from resourceConfig for
resource" + resourceName, e);
}
}
- final String key = getPerResourceStateTransitionPoolName(resourceName);
+ final String key =
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
if (threadpoolSize > 0) {
_executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize,
r -> new Thread(r, "GerenricHelixController-message_handle_" +
key)));
- LOG.info("Added dedicate threadpool for resource: " + resourceName + "
with size: "
- + threadpoolSize);
+ LOG.info("Added dedicate threadpool for resource: " + resourceName + "
with size: " + threadpoolSize);
} else {
// if threadpool is not configured
// check whether client specifies customized threadpool.
@@ -361,21 +360,18 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
if (message.getBatchMessageMode() == true) {
executorService = _batchMessageExecutorService;
} else {
- String resourceName = message.getResourceName();
- if (resourceName != null) {
- String key = getPerResourceStateTransitionPoolName(resourceName);
- String perStateTransitionTypeKey =
- getStateTransitionType(key, message.getFromState(),
message.getToState());
- if (perStateTransitionTypeKey != null && _executorMap
- .containsKey(perStateTransitionTypeKey)) {
- LOG.info(String
- .format("Find per state transition type thread pool for
resource %s from %s to %s",
- message.getResourceName(), message.getFromState(),
message.getToState()));
- executorService = _executorMap.get(perStateTransitionTypeKey);
- } else if (_executorMap.containsKey(key)) {
- LOG.info("Find per-resource thread pool with key: " + key);
- executorService = _executorMap.get(key);
- }
+ Message.MessageInfo msgInfo = new Message.MessageInfo(message);
+ String perResourceTypeKey =
+
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
+ String perStateTransitionTypeKey =
+
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
+ if (perStateTransitionTypeKey != null &&
_executorMap.containsKey(perStateTransitionTypeKey)) {
+ LOG.info(String.format("Find per state transition type thread pool
for resource %s from %s to %s",
+ message.getResourceName(), message.getFromState(),
message.getToState()));
+ executorService = _executorMap.get(perStateTransitionTypeKey);
+ } else if (_executorMap.containsKey(perResourceTypeKey)) {
+ LOG.info("Find per-resource thread pool with key: " +
perResourceTypeKey);
+ executorService = _executorMap.get(perResourceTypeKey);
}
}
}
@@ -1432,17 +1428,6 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
instanceName, _liveInstanceStatus, _freezeSessionId, success);
}
- private String getStateTransitionType(String prefix, String fromState,
String toState) {
- if (prefix == null || fromState == null || toState == null) {
- return null;
- }
- return String.format("%s.%s.%s", prefix, fromState, toState);
- }
-
- private String getPerResourceStateTransitionPoolName(String resourceName) {
- return MessageType.STATE_TRANSITION.name() + "." + resourceName;
- }
-
public LiveInstanceStatus getLiveInstanceStatus() {
return _liveInstanceStatus;
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 6222df521..b3a7c8884 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -966,4 +966,43 @@ public class Message extends HelixProperty {
}
return true;
}
+
+ /**
+ * This class is for categorizing state transition messages based on certain
properties, and generating
+ */
+ public static class MessageInfo {
+ public String _msgType;
+ public String _resourceName;
+ public String _fromState;
+ public String _toState;
+
+ public enum MessageIdentifierBase {
+ PER_RESOURCE,
+ PER_STATE_TRANSITION_TYPE
+ }
+
+ public MessageInfo(Message message) {
+ _msgType = message.getMsgType();
+ _resourceName = message.getResourceName();
+ _fromState = message.getFromState();
+ _toState = message.getToState();
+ }
+
+ public String getMessageIdentifier(MessageIdentifierBase basis) {
+ String delimiter = ".";
+ if (basis == null || _msgType == null || _resourceName == null) {
+ return null;
+ }
+ String identifier = String.join(delimiter, _msgType, _resourceName);
+ switch (basis) {
+ case PER_STATE_TRANSITION_TYPE:
+ if (_fromState == null || _toState == null) {
+ return null;
+ }
+ identifier = String.join(delimiter, identifier, _fromState,
_toState);
+ break;
+ }
+ return identifier;
+ }
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 1a8cc5ac4..d4b554978 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -32,26 +32,41 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import com.google.common.collect.ImmutableList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.MockAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.MockClusterMessagingService;
import org.apache.helix.mock.MockManager;
+import org.apache.helix.mock.statemodel.MockMasterSlaveStateModel;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.*;
+
+
public class TestHelixTaskExecutor {
@BeforeClass
public void beforeClass() {
@@ -1104,4 +1119,89 @@ public class TestHelixTaskExecutor {
HelixDefinedState.ERROR.toString());
System.out.println("END " + TestHelper.getTestMethodName());
}
+
+ @Test
+ public void testUpdateAndFindMessageThreadpool() throws Exception {
+ // Using ThreadPoolExecutor interface because it allows counting task
number
+ ThreadPoolExecutor executor0 =
+ new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>());
+ ThreadPoolExecutor executor1 =
+ new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>());
+
+ class MockStateModelFactory_ResourceName
+ extends
StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> {
+ @Override
+ public ExecutorService getExecutorService(String resourceName) {
+ return executor0;
+ }
+ }
+
+ class MockStateModelFactory_STType
+ extends
StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> {
+ @Override
+ public ExecutorService getExecutorService(String resourceName, String
fromState, String toState) {
+ return executor1;
+ }
+ }
+
+ System.out.println("START " + TestHelper.getTestMethodName());
+ String sessionId = UUID.randomUUID().toString();
+ String resourceName = "testDB";
+ String msgId = "testMsgId";
+ String fromState = "Offline";
+ String toState = "Online";
+ String stateModelDef = "OnlineOffline";
+ HelixManager manager = mock(ZKHelixManager.class);
+ StateMachineEngine engine = mock(HelixStateMachineEngine.class);
+ when(manager.getStateMachineEngine()).thenReturn(engine);
+ when(manager.getInstanceType()).thenReturn(InstanceType.PARTICIPANT);
+ when(manager.getHelixDataAccessor()).thenReturn(new MockAccessor());
+ when(manager.getSessionId()).thenReturn(sessionId);
+ when(manager.getInstanceName()).thenReturn("TestInstance");
+ when(manager.getMessagingService()).thenReturn(new
MockClusterMessagingService());
+ when(manager.getClusterName()).thenReturn(TestHelper.getTestMethodName());
+ StateModel stateModel = new MockMasterSlaveStateModel();
+ NotificationContext context = new NotificationContext(manager);
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ Message message = new Message(Message.MessageType.STATE_TRANSITION, msgId);
+ message.setFromState(fromState);
+ message.setToState(toState);
+ message.setResourceName(resourceName);
+ message.setStateModelDef(stateModelDef);
+ message.setPartitionName("TestPartition");
+ message.setTgtName("TgtInstance");
+ message.setStateModelFactoryName("DEFAULT");
+ message.setTgtSessionId(sessionId);
+
+ // State transition type based
+ executor =
+ new HelixTaskExecutor(); // Re-initialize it because if the message
exists in _taskMap, it won't be assigned again
+ StateModelFactory<? extends StateModel> factory = new
MockStateModelFactory_STType();
+ Mockito.doReturn(factory)
+ .when(engine)
+ .getStateModelFactory(stateModelDef,
HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+ HelixStateTransitionHandler handler = new
HelixStateTransitionHandler(factory, stateModel, message, context, new
CurrentState(resourceName));
+ HelixTask task = new HelixTask(message, context, handler, executor);
+ executor.scheduleTask(task);
+ Assert.assertTrue(TestHelper.verify(() -> {
+ return executor1.getTaskCount() == 1;
+ }, TestHelper.WAIT_DURATION));
+ System.out.println(TestHelper.getTestMethodName() + ": State transition
based test passed.");
+
+ // Resource name based
+ executor = new HelixTaskExecutor();
+ factory = new MockStateModelFactory_ResourceName();
+ Mockito.doReturn(factory)
+ .when(engine)
+ .getStateModelFactory(stateModelDef,
HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+ handler = new HelixStateTransitionHandler(factory, stateModel, message,
context, new CurrentState(resourceName));
+ engine.registerStateModelFactory(stateModelDef, factory);
+ task = new HelixTask(message, context, handler, executor);
+ executor.scheduleTask(task);
+ Assert.assertTrue(TestHelper.verify(() -> {
+ return executor0.getTaskCount() == 1;
+ }, TestHelper.WAIT_DURATION));
+ System.out.println(TestHelper.getTestMethodName() + ": Resource name based
test passed.");
+ System.out.println("END " + TestHelper.getTestMethodName());
+ }
}