This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 215d407 Refine the HelixTaskExecutor to reduce duplicate code and
clarify the code structure. (#1489)
215d407 is described below
commit 215d4072756651ea7f242adc68442780828ba0f9
Author: Jiajun Wang <[email protected]>
AuthorDate: Thu Oct 29 13:41:22 2020 -0700
Refine the HelixTaskExecutor to reduce duplicate code and clarify the code
structure. (#1489)
There is a minor code logic change for optimization. But there is no
business logic change in this PR.
---
.../messaging/handling/HelixTaskExecutor.java | 505 +++++++++++----------
1 file changed, 270 insertions(+), 235 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 1e9c455..b039afd 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -76,6 +75,7 @@ import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class HelixTaskExecutor implements MessageListener, TaskExecutor {
/**
* Put together all registration information about a message handler factory
@@ -165,19 +165,18 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
_hdlrFtyRegistry = new ConcurrentHashMap<>();
_executorMap = new ConcurrentHashMap<>();
_messageTaskMap = new ConcurrentHashMap<>();
- _knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap<String,
Boolean>());
+ _knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
_batchMessageExecutorService = Executors.newCachedThreadPool();
_monitor.createExecutorMonitor("BatchMessageExecutor",
_batchMessageExecutorService);
- _resourcesThreadpoolChecked =
- Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- _transitionTypeThreadpoolChecked =
- Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ _resourcesThreadpoolChecked = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ _transitionTypeThreadpoolChecked = Collections.newSetFromMap(new
ConcurrentHashMap<>());
_lock = new Object();
_statusUpdateUtil = new StatusUpdateUtil();
- _timer = new Timer("HelixTaskExecutor_Timer", true); // created as a
daemon timer thread to handle task timeout
+ // created as a daemon timer thread to handle task timeout
+ _timer = new Timer("HelixTaskExecutor_Timer", true);
_isShuttingDown = false;
@@ -192,7 +191,7 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
@Override
public void registerMessageHandlerFactory(String type, MessageHandlerFactory
factory,
int threadpoolSize) {
- if (factory instanceof MultiTypeMessageHandlerFactory) {
+ if (factory instanceof MultiTypeMessageHandlerFactory) {
if (!((MultiTypeMessageHandlerFactory)
factory).getMessageTypes().contains(type)) {
throw new HelixException("Message factory type mismatch. Type: " +
type + ", factory: "
+ ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
@@ -200,35 +199,30 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
} else {
if (!factory.getMessageType().equals(type)) {
throw new HelixException(
- "Message factory type mismatch. Type: " + type + ", factory: " +
factory.getMessageType());
+ "Message factory type mismatch. Type: " + type + ", factory: " +
factory
+ .getMessageType());
}
}
_isShuttingDown = false;
- MsgHandlerFactoryRegistryItem newItem = new
MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
+ MsgHandlerFactoryRegistryItem newItem =
+ new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
MsgHandlerFactoryRegistryItem prevItem =
_hdlrFtyRegistry.putIfAbsent(type, newItem);
if (prevItem == null) {
- ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize,
new ThreadFactory() {
- @Override public Thread newThread(Runnable r) {
- return new Thread(r, "HelixTaskExecutor-message_handle_thread_" +
thread_uid.getAndIncrement());
- }
- });
- ExecutorService prevExecutor = _executorMap.putIfAbsent(type, newPool);
- if (prevExecutor != null) {
- LOG.warn("Skip creating a new thread pool for type: " + type + ",
already existing pool: "
- + prevExecutor + ", isShutdown: " + prevExecutor.isShutdown());
- newPool.shutdown();
- newPool = null;
- } else {
+ _executorMap.computeIfAbsent(type, msgType -> {
+ ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize,
r -> new Thread(r,
+ "HelixTaskExecutor-message_handle_thread_" +
thread_uid.getAndIncrement()));
_monitor.createExecutorMonitor(type, newPool);
- }
- LOG.info("Registered message handler factory for type: " + type + ",
poolSize: "
- + threadpoolSize + ", factory: " + factory + ", pool: " +
_executorMap.get(type));
+ return newPool;
+ });
+ LOG.info(
+ "Registered message handler factory for type: " + type + ",
poolSize: " + threadpoolSize
+ + ", factory: " + factory + ", pool: " + _executorMap.get(type));
} else {
LOG.info("Skip register message handler factory for type: " + type + ",
poolSize: "
- + threadpoolSize + ", factory: " + factory + ", already existing
factory: "
- + prevItem.factory());
+ + threadpoolSize + ", factory: " + factory + ", already existing
factory: " + prevItem
+ .factory());
newItem = null;
}
}
@@ -287,9 +281,8 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
ConfigAccessor configAccessor = manager.getConfigAccessor();
// Changes to this configuration on thread pool size will only take
effect after the participant get restarted.
if (configAccessor != null) {
- HelixConfigScope scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
-
.forCluster(manager.getClusterName()).forResource(resourceName).build();
+ HelixConfigScope scope = new
HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
+
.forCluster(manager.getClusterName()).forResource(resourceName).build();
String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
try {
@@ -303,11 +296,8 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
}
final String key = getPerResourceStateTransitionPoolName(resourceName);
if (threadpoolSize > 0) {
- _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize, new
ThreadFactory() {
- @Override public Thread newThread(Runnable r) {
- return new Thread(r, "GerenricHelixController-message_handle_" +
key);
- }
- }));
+ _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize,
+ r -> new Thread(r, "GerenricHelixController-message_handle_" +
key)));
LOG.info("Added dedicate threadpool for resource: " + resourceName + "
with size: "
+ threadpoolSize);
} else {
@@ -336,7 +326,7 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
ExecutorService findExecutorServiceForMsg(Message message) {
ExecutorService executorService = _executorMap.get(message.getMsgType());
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
- if(message.getBatchMessageMode() == true) {
+ if (message.getBatchMessageMode() == true) {
executorService = _batchMessageExecutorService;
} else {
String resourceName = message.getResourceName();
@@ -344,9 +334,11 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
String key = getPerResourceStateTransitionPoolName(resourceName);
String perStateTransitionTypeKey =
getStateTransitionType(key, message.getFromState(),
message.getToState());
- if (perStateTransitionTypeKey != null &&
_executorMap.containsKey(perStateTransitionTypeKey)) {
- LOG.info(String.format("Find per state transition type thread pool
for resource %s from %s to %s",
- message.getResourceName(), message.getFromState(),
message.getToState()));
+ if (perStateTransitionTypeKey != null && _executorMap
+ .containsKey(perStateTransitionTypeKey)) {
+ LOG.info(String
+ .format("Find per state transition type thread pool for
resource %s from %s to %s",
+ message.getResourceName(), message.getFromState(),
message.getToState()));
executorService = _executorMap.get(perStateTransitionTypeKey);
} else if (_executorMap.containsKey(key)) {
LOG.info("Find per-resource thread pool with key: " + key);
@@ -415,8 +407,8 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId,
message.getResourceName(),
message.getPartitionName(), message.getFromState(),
message.getToState());
- _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
- "Message handling task scheduled", manager);
+ _statusUpdateUtil
+ .logInfo(message, HelixTaskExecutor.class, "Message handling task
scheduled", manager);
// this sync guarantees that ExecutorService.submit() task and put
taskInfo into map are
// sync'ed
@@ -442,8 +434,9 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
if (message.getExecutionTimeout() > 0) {
timerTask = new MessageTimeoutTask(this, task);
_timer.schedule(timerTask, message.getExecutionTimeout());
- LOG.info("Message starts with timeout " +
message.getExecutionTimeout() + " MsgId: "
- + task.getTaskId());
+ LOG.info(
+ "Message starts with timeout " + message.getExecutionTimeout()
+ " MsgId: " + task
+ .getTaskId());
} else {
LOG.debug("Message does not have timeout. MsgId: " +
task.getTaskId());
}
@@ -458,8 +451,9 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
}
} catch (Exception e) {
LOG.error("Error while executing task. " + message, e);
- _statusUpdateUtil.logError(message, HelixTaskExecutor.class, e, "Error
while executing task "
- + e, manager);
+ _statusUpdateUtil
+ .logError(message, HelixTaskExecutor.class, e, "Error while
executing task " + e,
+ manager);
}
return false;
}
@@ -493,8 +487,9 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
_taskMap.remove(taskId);
return true;
} else {
- _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
- "fail to cancel task: " + taskId,
notificationContext.getManager());
+ _statusUpdateUtil
+ .logInfo(message, HelixTaskExecutor.class, "fail to cancel task:
" + taskId,
+ notificationContext.getManager());
}
} else {
_statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
@@ -538,15 +533,14 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
* We use the updater to avoid race condition between writing message to
zk as READ state and removing message after ST is done
* If there is no message at this path, meaning the message is removed
so we do not write the message
*/
- updaters.add(new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData == null) {
- LOG.warn("Message {} targets at {} has already been removed before
it is set as READ on instance {}", msg.getId(), msg.getTgtName(), instanceName);
- return null;
- }
- return msg.getRecord();
+ updaters.add(currentData -> {
+ if (currentData == null) {
+ LOG.warn(
+ "Message {} targets at {} has already been removed before it is
set as READ on instance {}",
+ msg.getId(), msg.getTgtName(), instanceName);
+ return null;
}
+ return msg.getRecord();
});
}
accessor.updateChildren(readMsgPaths, updaters, AccessOption.PERSISTENT);
@@ -629,7 +623,8 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
// Log all tasks that fail to terminate
for (String taskId : _taskMap.keySet()) {
MessageTaskInfo info = _taskMap.get(taskId);
- sb.append("Task: " + taskId + " fails to terminate. Message: " +
info._task.getMessage() + "\n");
+ sb.append(
+ "Task: " + taskId + " fails to terminate. Message: " +
info._task.getMessage() + "\n");
}
LOG.info(sb.toString());
@@ -654,31 +649,24 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
// Re-init all existing factories
for (final String msgType : _hdlrFtyRegistry.keySet()) {
MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
- ExecutorService newPool =
- Executors.newFixedThreadPool(item.threadPoolSize(), new
ThreadFactory() {
- @Override public Thread newThread(Runnable r) {
- return new Thread(r, "HelixTaskExecutor-message_handle_" +
msgType);
- }
- });
- ExecutorService prevPool = _executorMap.putIfAbsent(msgType, newPool);
- if (prevPool != null) {
- // Will happen if we register and call init
- LOG.info("Skip init a new thread pool for type: " + msgType + ",
already existing pool: "
- + prevPool + ", isShutdown: " + prevPool.isShutdown());
- newPool.shutdown();
- } else {
- _monitor.createExecutorMonitor(msgType, newPool);
- }
+ ExecutorService pool = _executorMap.computeIfAbsent(msgType, type -> {
+ ExecutorService newPool =
Executors.newFixedThreadPool(item.threadPoolSize(),
+ r -> new Thread(r, "HelixTaskExecutor-message_handle_" + type));
+ _monitor.createExecutorMonitor(type, newPool);
+ return newPool;
+ });
+ LOG.info("Setup the thread pool for type: %s, isShutdown: %s", msgType,
pool.isShutdown());
}
}
private void syncSessionToController(HelixManager manager) {
- if (_lastSessionSyncTime == null ||
- System.currentTimeMillis() - _lastSessionSyncTime >
SESSION_SYNC_INTERVAL) { // > delay since last sync
+ if (_lastSessionSyncTime == null || System.currentTimeMillis() -
_lastSessionSyncTime
+ > SESSION_SYNC_INTERVAL) { // > delay since last sync
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey key = new
Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC);
if (accessor.getProperty(key) == null) {
- LOG.info(String.format("Participant %s syncs session with controller",
manager.getInstanceName()));
+ LOG.info(String
+ .format("Participant %s syncs session with controller",
manager.getInstanceName()));
Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE,
SESSION_SYNC);
msg.setSrcName(manager.getInstanceName());
msg.setTgtSessionId("*");
@@ -734,7 +722,7 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
List<Message> newMessages = accessor.getProperty(keys, false);
// Message may be removed before get read, clean up null messages.
Iterator<Message> messageIterator = newMessages.iterator();
- while(messageIterator.hasNext()) {
+ while (messageIterator.hasNext()) {
if (messageIterator.next() == null) {
messageIterator.remove();
}
@@ -772,7 +760,8 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
for (Message message : messages) {
sb.append(message.getMsgId() + ",");
}
- LOG.info("Helix task executor is shutting down, discard unprocessed
messages : " + sb.toString());
+ LOG.info(
+ "Helix task executor is shutting down, ignore unprocessed messages :
" + sb.toString());
return;
}
@@ -810,148 +799,36 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
Set<String> createCurStateNames = new HashSet<>();
for (Message message : messages) {
- try {
- // nop messages are simply removed. It is used to trigger onMessage()
in
- // situations such as register a new message handler factory
- if
(message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) {
- LOG.info(
- "Dropping NO-OP message. mid: " + message.getId() + ", from: " +
message.getMsgSrc());
- reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
- continue;
- }
-
- String tgtSessionId = message.getTgtSessionId();
- // sessionId mismatch normally means message comes from expired
session, just remove it
- if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
- String warningMessage =
- "SessionId does NOT match. expected sessionId: " + sessionId
- + ", tgtSessionId in message: " + tgtSessionId + ",
messageId: "
- + message.getMsgId();
- LOG.warn(warningMessage);
- reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
- _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class,
warningMessage, manager);
-
- // Proactively send a session sync message from participant to
controller
- // upon session mismatch after a new session is established
- if (manager.getInstanceType() == InstanceType.PARTICIPANT
- || manager.getInstanceType() ==
InstanceType.CONTROLLER_PARTICIPANT) {
- if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
- syncSessionToController(manager);
- }
- }
- continue;
- }
-
- if ((manager.getInstanceType() == InstanceType.CONTROLLER
- || manager.getInstanceType() ==
InstanceType.CONTROLLER_PARTICIPANT)
- &&
MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
- LOG.info(String.format("Controller received
PARTICIPANT_SESSION_CHANGE msg from src: %s",
- message.getMsgSrc()));
- PropertyKey key = new
Builder(manager.getClusterName()).liveInstances();
- List<LiveInstance> liveInstances =
- manager.getHelixDataAccessor().getChildValues(key, true);
- _controller.onLiveInstanceChange(liveInstances, changeContext);
- reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.COMPLETED);
- continue;
- }
-
- // don't process message that is of READ or UNPROCESSABLE state
- if (MessageState.NEW != message.getMsgState()) {
- // It happens because we don't delete message right after
- // read. Instead we keep it until the current state is updated.
- // We will read the message again if there is a new message but we
- // check for the status and ignore if its already read
- if (LOG.isTraceEnabled()) {
- LOG.trace("Message already read. msgId: " + message.getMsgId());
- }
- continue;
- }
-
- if (message.isExpired()) {
- LOG.info(
- "Dropping expired message. mid: " + message.getId() + ", from: "
+ message.getMsgSrc()
- + " relayed from: " + message.getRelaySrcHost());
- reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
- continue;
- }
-
- // State Transition Cancellation
- if
(message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name()))
{
- boolean success = cancelNotStartedStateTransition(message,
stateTransitionHandlers, accessor, instanceName);
- if (success) {
- continue;
- }
- }
-
- _monitor.reportReceivedMessage(message);
- } catch (Exception e) {
- LOG.error("Failed to process the message {}. Deleting the message from
ZK. Exception: {}",
- message, e);
- removeMessageFromTaskAndFutureMap(message);
- removeMessageFromZK(accessor, message, instanceName);
+ if (checkAndProcessNoOpMessage(message, instanceName, changeContext,
manager, sessionId,
+ stateTransitionHandlers)) {
+ // skip the following operations for the no-op messages.
continue;
}
-
// create message handlers, if handlers not found, leave its state as NEW
NotificationContext msgWorkingContext = changeContext.clone();
try {
- MessageHandler createHandler = createMessageHandler(message,
msgWorkingContext);
- if (createHandler == null) {
+ MessageHandler msgHandler = createMessageHandler(message,
msgWorkingContext);
+ if (msgHandler == null) {
+ // Failed to create message handler, skip processing this message in
this callback.
+ // The same message process will be retried in the next round.
continue;
}
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
|| message.getMsgType()
.equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
- String messageTarget =
- getMessageTarget(message.getResourceName(),
message.getPartitionName());
-
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
- && isStateTransitionInProgress(messageTarget)) {
-
- String taskId = _messageTaskMap.get(messageTarget);
- Message msg = _taskMap.get(taskId).getTask().getMessage();
-
- // If there is another state transition for same partition is
going on,
- // discard the message. Controller will resend if this is a valid
message
- String errMsg = String.format(
- "Another state transition for %s:%s is in progress with msg:
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
- String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
- System.currentTimeMillis(), message.getFromState(),
message.getToState());
- handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor,
- instanceName, manager);
- continue;
- }
- if (createHandler instanceof HelixStateTransitionHandler) {
- // We only check to state if there is no ST task
scheduled/executing.
- HelixStateTransitionHandler.StaleMessageValidateResult result =
- ((HelixStateTransitionHandler)
createHandler).staleMessageValidator();
- if (!result.isValid) {
- handleUnprocessableMessage(message, null /* exception */,
- result.exception.getMessage(), accessor, instanceName,
manager);
- continue;
- }
- }
- if (stateTransitionHandlers.containsKey(messageTarget)) {
- // If there are 2 messages in same batch about same partition's
state transition,
- // the later one is discarded
- Message duplicatedMessage =
stateTransitionHandlers.get(messageTarget)._message;
- String errMsg = String.format(
- "Duplicated state transition message: %s. Existing: %s->%s;
New (Discarded): %s->%s",
- message.getMsgId(), duplicatedMessage.getFromState(),
- duplicatedMessage.getToState(), message.getFromState(),
message.getToState());
- handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor,
- instanceName, manager);
+ if (validateAndProcessStateTransitionMessage(message, instanceName,
manager,
+ stateTransitionHandlers, msgHandler)) {
+ // Need future process by triggering state transition
+ String msgTarget =
+ getMessageTarget(message.getResourceName(),
message.getPartitionName());
+ stateTransitionHandlers.put(msgTarget, msgHandler);
+ stateTransitionContexts.put(msgTarget, msgWorkingContext);
+ } else {
+ // skip the following operations for the invalid/expired state
transition messages.
continue;
}
-
- stateTransitionHandlers
- .put(getMessageTarget(message.getResourceName(),
message.getPartitionName()),
- createHandler);
- stateTransitionContexts
- .put(getMessageTarget(message.getResourceName(),
message.getPartitionName()),
- msgWorkingContext);
} else {
- nonStateTransitionHandlers.add(createHandler);
+ // Need future process non state transition messages by triggering
the handler
+ nonStateTransitionHandlers.add(msgHandler);
nonStateTransitionContexts.add(msgWorkingContext);
}
} catch (Exception e) {
@@ -959,15 +836,15 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
continue;
}
- markReadMessage(message, msgWorkingContext, manager);
- readMsgs.add(message);
-
+ // Update the processed message objects
+ readMsgs.add(markReadMessage(message, msgWorkingContext, manager));
// batch creation of all current state meta data
// do it for non-controller and state transition messages only
- if (!message.isControlerMsg()
- &&
message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
+ if (!message.isControlerMsg() && message.getMsgType()
+ .equals(Message.MessageType.STATE_TRANSITION.name())) {
String resourceName = message.getResourceName();
- if (!curResourceNames.contains(resourceName) &&
!createCurStateNames.contains(resourceName)) {
+ if (!curResourceNames.contains(resourceName) && !createCurStateNames
+ .contains(resourceName)) {
createCurStateNames.add(resourceName);
createCurStateKeys.add(keyBuilder.currentState(instanceName,
sessionId, resourceName));
@@ -982,7 +859,6 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
} else {
metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
}
-
metaCurStates.add(metaCurState);
}
}
@@ -997,30 +873,186 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
}
}
- // update message state to READ in batch and schedule all read messages
+ // update message state to READ in batch and schedule tasks for all read
messages
if (readMsgs.size() > 0) {
updateMessageState(readMsgs, accessor, instanceName);
- // Remove message if schedule tasks are failed.
for (Map.Entry<String, MessageHandler> handlerEntry :
stateTransitionHandlers.entrySet()) {
MessageHandler handler = handlerEntry.getValue();
NotificationContext context =
stateTransitionContexts.get(handlerEntry.getKey());
- Message msg = handler._message;
- if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
- removeMessageFromTaskAndFutureMap(msg);
- removeMessageFromZK(accessor, msg, instanceName);
- }
+ scheduleTaskForMessage(instanceName, accessor, handler, context);
}
for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
MessageHandler handler = nonStateTransitionHandlers.get(i);
NotificationContext context = nonStateTransitionContexts.get(i);
- Message msg = handler._message;
- if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
- removeMessageFromTaskAndFutureMap(msg);
- removeMessageFromZK(accessor, msg, instanceName);
+ scheduleTaskForMessage(instanceName, accessor, handler, context);
+ }
+ }
+ }
+
+ /**
+ * Inspect the message. Report and remove it if no operation needs to be
done.
+ * @param message
+ * @param instanceName
+ * @param changeContext
+ * @param manager
+ * @param sessionId
+ * @param stateTransitionHandlers
+ * @return True if the message is no-op message and no other process step is
required.
+ */
+ private boolean checkAndProcessNoOpMessage(Message message, String
instanceName,
+ NotificationContext changeContext, HelixManager manager, String
sessionId,
+ Map<String, MessageHandler> stateTransitionHandlers) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ try {
+ // nop messages are simply removed. It is used to trigger onMessage() in
+ // situations such as register a new message handler factory
+ if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString()))
{
+ LOG.info(
+ "Dropping NO-OP message. mid: " + message.getId() + ", from: " +
message.getMsgSrc());
+ reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
+ return true;
+ }
+
+ String tgtSessionId = message.getTgtSessionId();
+ // sessionId mismatch normally means message comes from expired session,
just remove it
+ if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
+ String warningMessage = "SessionId does NOT match. expected sessionId:
" + sessionId
+ + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " +
message.getMsgId();
+ LOG.warn(warningMessage);
+ reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
+ _statusUpdateUtil
+ .logWarning(message, HelixStateMachineEngine.class,
warningMessage, manager);
+
+ // Proactively send a session sync message from participant to
controller
+ // upon session mismatch after a new session is established
+ if (manager.getInstanceType() == InstanceType.PARTICIPANT
+ || manager.getInstanceType() ==
InstanceType.CONTROLLER_PARTICIPANT) {
+ if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
+ syncSessionToController(manager);
+ }
}
+ return true;
+ }
+
+ if ((manager.getInstanceType() == InstanceType.CONTROLLER
+ || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+ &&
MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
+ LOG.info(String.format("Controller received PARTICIPANT_SESSION_CHANGE
msg from src: %s",
+ message.getMsgSrc()));
+ PropertyKey key = new
Builder(manager.getClusterName()).liveInstances();
+ List<LiveInstance> liveInstances =
manager.getHelixDataAccessor().getChildValues(key, true);
+ _controller.onLiveInstanceChange(liveInstances, changeContext);
+ reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.COMPLETED);
+ return true;
}
+
+ // don't process message that is of READ or UNPROCESSABLE state
+ if (MessageState.NEW != message.getMsgState()) {
+ // It happens because we don't delete message right after
+ // read. Instead we keep it until the current state is updated.
+ // We will read the message again if there is a new message but we
+ // check for the status and ignore if its already read
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message already read. msgId: " + message.getMsgId());
+ }
+ return true;
+ }
+
+ if (message.isExpired()) {
+ LOG.info(
+ "Dropping expired message. mid: " + message.getId() + ", from: " +
message.getMsgSrc()
+ + " relayed from: " + message.getRelaySrcHost());
+ reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
+ return true;
+ }
+
+ // State Transition Cancellation
+ if
(message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name()))
{
+ boolean success =
+ cancelNotStartedStateTransition(message, stateTransitionHandlers,
accessor,
+ instanceName);
+ if (success) {
+ return true;
+ }
+ }
+
+ _monitor.reportReceivedMessage(message);
+ } catch (Exception e) {
+ LOG.error("Failed to process the message {}. Deleting the message from
ZK. Exception: {}",
+ message, e);
+ removeMessageFromTaskAndFutureMap(message);
+ removeMessageFromZK(accessor, message, instanceName);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Preprocess the state transition message to validate if the request is
valid.
+ * If no operation needs to be triggered, discard the the message.
+ * @param message
+ * @param instanceName
+ * @param manager
+ * @param stateTransitionHandlers
+ * @param createHandler
+ * @return True if the requested state transition is valid, and need to
schedule the transition.
+ * False if no more operation is required.
+ */
+ private boolean validateAndProcessStateTransitionMessage(Message message,
String instanceName,
+ HelixManager manager, Map<String, MessageHandler>
stateTransitionHandlers,
+ MessageHandler createHandler) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+ String messageTarget = getMessageTarget(message.getResourceName(),
message.getPartitionName());
+ if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+ && isStateTransitionInProgress(messageTarget)) {
+ String taskId = _messageTaskMap.get(messageTarget);
+ Message msg = _taskMap.get(taskId).getTask().getMessage();
+ // If there is another state transition for same partition is going on,
+ // discard the message. Controller will resend if this is a valid message
+ String errMsg = String.format(
+ "Another state transition for %s:%s is in progress with msg: %s,
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ msg.isRelayMessage(), msg.getReadTimeStamp(),
System.currentTimeMillis(),
+ message.getFromState(), message.getToState());
+ handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor, instanceName,
+ manager);
+ return false;
+ }
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task scheduled/executing.
+ HelixStateTransitionHandler.StaleMessageValidateResult result =
+ ((HelixStateTransitionHandler)
createHandler).staleMessageValidator();
+ if (!result.isValid) {
+ handleUnprocessableMessage(message, null /* exception */,
result.exception.getMessage(),
+ accessor, instanceName, manager);
+ return false;
+ }
+ }
+ if (stateTransitionHandlers.containsKey(messageTarget)) {
+ // If there are 2 messages in same batch about same partition's state
transition,
+ // the later one is discarded
+ Message duplicatedMessage =
stateTransitionHandlers.get(messageTarget)._message;
+ String errMsg = String.format(
+ "Duplicated state transition message: %s. Existing: %s->%s; New
(Discarded): %s->%s",
+ message.getMsgId(), duplicatedMessage.getFromState(),
duplicatedMessage.getToState(),
+ message.getFromState(), message.getToState());
+ handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor, instanceName,
+ manager);
+ return false;
+ }
+ return true;
+ }
+
+ private void scheduleTaskForMessage(String instanceName, HelixDataAccessor
accessor,
+ MessageHandler handler, NotificationContext context) {
+ Message msg = handler._message;
+ if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
+ // Remove message if schedule tasks are failed.
+ removeMessageFromTaskAndFutureMap(msg);
+ removeMessageFromZK(accessor, msg, instanceName);
}
}
@@ -1047,9 +1079,11 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
// 3. Message handled and task already started
// This method tries to handle the first two cases, it returns true if no
further cancellation is needed,
// false if not been able to cancel the state transition (i.e, further
cancellation is needed).
- private boolean cancelNotStartedStateTransition(Message message, Map<String,
MessageHandler> stateTransitionHandlers,
- HelixDataAccessor accessor, String instanceName) {
- String targetMessageName = getMessageTarget(message.getResourceName(),
message.getPartitionName());
+ private boolean cancelNotStartedStateTransition(Message message,
+ Map<String, MessageHandler> stateTransitionHandlers, HelixDataAccessor
accessor,
+ String instanceName) {
+ String targetMessageName =
+ getMessageTarget(message.getResourceName(),
message.getPartitionName());
ProcessedMessageState messageState;
Message targetStateTransitionMessage;
@@ -1107,13 +1141,14 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
removeMessageFromZK(accessor, message, instanceName);
}
- private void markReadMessage(Message message, NotificationContext context,
+ private Message markReadMessage(Message message, NotificationContext context,
HelixManager manager) {
message.setMsgState(MessageState.READ);
message.setReadTimeStamp(new Date().getTime());
message.setExecuteSessionId(context.getManager().getSessionId());
_statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New
Message", manager);
+ return message;
}
private void handleUnprocessableMessage(Message message, Exception
exception, String errorMsg,
@@ -1128,12 +1163,12 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
}
message.setMsgState(MessageState.UNPROCESSABLE);
removeMessageFromZK(accessor, message, instanceName);
- _monitor.reportProcessedMessage(message,
- ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
-
+ _monitor
+ .reportProcessedMessage(message,
ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
}
+
public MessageHandler createMessageHandler(Message message,
NotificationContext changeContext) {
- String msgType = message.getMsgType().toString();
+ String msgType = message.getMsgType();
MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
@@ -1141,8 +1176,8 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
// we will keep the message and the message will be handled when
// the corresponding MessageHandlerFactory is registered
if (item == null) {
- LOG.warn("Fail to find message handler factory for type: " + msgType + "
msgId: "
- + message.getMsgId());
+ LOG.warn("Fail to find message handler factory for type: " + msgType + "
msgId: " + message
+ .getMsgId());
return null;
}
MessageHandlerFactory handlerFactory = item.factory();
@@ -1171,7 +1206,7 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
return String.format("%s_%s", resourceName, partitionName);
}
- private String getStateTransitionType(String prefix, String fromState,
String toState){
+ private String getStateTransitionType(String prefix, String fromState,
String toState) {
if (prefix == null || fromState == null || toState == null) {
return null;
}