This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ea5d4a00 Optimize HelixTaskExecutor reset() in event of shutdown 
(#2183)
3ea5d4a00 is described below

commit 3ea5d4a000d545381b1f280e17fa5ce3608607e2
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Thu Aug 11 10:57:59 2022 -0700

    Optimize HelixTaskExecutor reset() in event of shutdown (#2183)
    
    Optimize HelixTaskExecutor reset() in event of shutdown
    
    Some instances maybe reset() multiple times during participant shutdown.
    This commit refactor logic in HelixTaskExecutor to reduce unnecessary
    method call.
---
 .../messaging/handling/HelixTaskExecutor.java      | 62 +++++++++++++++-------
 .../helix/participant/HelixStateMachineEngine.java |  1 +
 .../messaging/handling/TestHelixTaskExecutor.java  | 38 +++++++++++++
 3 files changed, 81 insertions(+), 20 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 80b70bbc3..b21a065a9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -136,6 +137,8 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
   private final ParticipantStatusMonitor _monitor;
   public static final String MAX_THREADS = "maxThreads";
 
+  // true if all partition state are "clean" as same after reset()
+  private volatile boolean _isCleanState = true;
   private MessageQueueMonitor _messageQueueMonitor;
   private GenericHelixController _controller;
   private Long _lastSessionSyncTime;
@@ -677,13 +680,10 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     }
   }
 
-  void reset() {
-    LOG.info("Reset HelixTaskExecutor");
-
-    if (_messageQueueMonitor != null) {
-      _messageQueueMonitor.reset();
-    }
-
+  /**
+   * Shutdown the registered thread pool executors. This method will be no-op 
if called repeatedly.
+   */
+  private void shutdownExecutors() {
     synchronized (_hdlrFtyRegistry) {
       for (String msgType : _hdlrFtyRegistry.keySet()) {
         // don't un-register factories, just shutdown all executors
@@ -694,17 +694,37 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
           LOG.info("Reset executor for msgType: " + msgType + ", pool: " + 
pool);
           shutdownAndAwaitTermination(pool, item);
         }
-
-        if (item.factory() != null) {
-          try {
-            item.factory().reset();
-          } catch (Exception ex) {
-            LOG.error("Failed to reset the factory {} of message type {}.", 
item.factory().toString(),
-                msgType, ex);
-          }
-        }
       }
     }
+  }
+
+  synchronized void reset() {
+    if (_isCleanState) {
+      LOG.info("HelixTaskExecutor is in clean state, no need to reset again");
+      return;
+    }
+    LOG.info("Reset HelixTaskExecutor");
+
+    if (_messageQueueMonitor != null) {
+      _messageQueueMonitor.reset();
+    }
+
+    shutdownExecutors();
+
+    synchronized (_hdlrFtyRegistry) {
+      _hdlrFtyRegistry.values()
+          .stream()
+          .map(MsgHandlerFactoryRegistryItem::factory)
+          .distinct()
+          .filter(Objects::nonNull)
+          .forEach(factory -> {
+            try {
+              factory.reset();
+            } catch (Exception ex) {
+              LOG.error("Failed to reset the factory {}.", factory.toString(), 
ex);
+            }
+          });
+    }
     // threads pool specific to STATE_TRANSITION.Key specific pool are not 
shut down.
     // this is a potential area to improve. 
https://github.com/apache/helix/issues/1245
 
@@ -712,8 +732,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
     // Log all tasks that fail to terminate
     for (String taskId : _taskMap.keySet()) {
       MessageTaskInfo info = _taskMap.get(taskId);
-      sb.append(
-          "Task: " + taskId + " fails to terminate. Message: " + 
info._task.getMessage() + "\n");
+      sb.append("Task: " + taskId + " fails to terminate. Message: " + 
info._task.getMessage() + "\n");
     }
 
     LOG.info(sb.toString());
@@ -724,6 +743,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
     _knownMessageIds.clear();
 
     _lastSessionSyncTime = null;
+    _isCleanState = true;
   }
 
   void init() {
@@ -744,7 +764,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         _monitor.createExecutorMonitor(type, newPool);
         return newPool;
       });
-      LOG.info("Setup the thread pool for type: %s, isShutdown: %s", msgType, 
pool.isShutdown());
+      LOG.info("Setup the thread pool for type: {}, isShutdown: {}", msgType, 
pool.isShutdown());
     }
   }
 
@@ -835,6 +855,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       init();
       // continue to process messages
     }
+    _isCleanState = false;
 
     // if prefetch is disabled in MessageListenerCallback, we need to read all 
new messages from zk.
     if (messages == null || messages.isEmpty()) {
@@ -1442,7 +1463,7 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
       nopMsg.setTgtName(instanceName);
       accessor
           .setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), 
nopMsg.getId()), nopMsg);
-      LOG.info("Send NO_OP message to {}}, msgId: {}.", nopMsg.getTgtName(), 
nopMsg.getId());
+      LOG.info("Send NO_OP message to {}, msgId: {}.", nopMsg.getTgtName(), 
nopMsg.getId());
     } catch (Exception e) {
       LOG.error("Failed to send NO_OP message to {}.", instanceName, e);
     }
@@ -1454,6 +1475,7 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     _isShuttingDown = true;
     _timer.cancel();
 
+    shutdownExecutors();
     reset();
     _monitor.shutDown();
     LOG.info("Shutdown HelixTaskExecutor finished");
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 50a078282..1a16de451 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -164,6 +164,7 @@ public class HelixStateMachineEngine implements 
StateMachineEngine {
   }
 
   private void loopStateModelFactories(Consumer<StateModel> consumer) {
+    // TODO: evaluate impact and consider parallelization
     for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : 
_stateModelFactoryMap
         .values()) {
       for (StateModelFactory<? extends StateModel> stateModelFactory : 
ftyMap.values()) {
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 7b239908a..1a8cc5ac4 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -132,6 +132,21 @@ public class TestHelixTaskExecutor {
     }
   }
 
+  private class TestMessageHandlerFactory3 extends TestMessageHandlerFactory {
+    private boolean _resetDone = false;
+
+    @Override
+    public List<String> getMessageTypes() {
+      return ImmutableList.of("msgType1", "msgType2", "msgType3");
+    }
+
+    @Override
+    public void reset() {
+      Assert.assertFalse(_resetDone, "reset() should only be triggered once in 
TestMessageHandlerFactory3");
+      _resetDone = true;
+    }
+  }
+
   class CancellableHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     int _handlersCreated = 0;
@@ -798,6 +813,29 @@ public class TestHelixTaskExecutor {
     System.out.println("END TestCMTaskExecutor.testHandlerResetTimeout()");
   }
 
+  @Test
+  public void testMsgHandlerRegistryAndShutdown() {
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    TestMessageHandlerFactory3 factoryMulti = new TestMessageHandlerFactory3();
+    executor.registerMessageHandlerFactory(factory, 
HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, 200);
+    executor.registerMessageHandlerFactory(factoryMulti, 
HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, 200);
+
+    final Message msg = new Message(factory.getMessageTypes().get(0), 
UUID.randomUUID().toString());
+    msg.setTgtSessionId("*");
+    msg.setTgtName("Localhost_1123");
+    msg.setSrcName("127.101.1.23_2234");
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage("some", Collections.singletonList(msg), changeContext);
+    Assert.assertEquals(executor._hdlrFtyRegistry.size(), 4);
+    // Ensure TestMessageHandlerFactory3 instance is reset and reset exactly 
once
+    executor.shutdown();
+    Assert.assertTrue(factoryMulti._resetDone, "TestMessageHandlerFactory3 
should be reset");
+  }
+
   @Test()
   public void testNoRetry() throws InterruptedException {
     System.out.println("START " + TestHelper.getTestMethodName());

Reply via email to