jackjlli commented on a change in pull request #7174:
URL: https://github.com/apache/pinot/pull/7174#discussion_r687050905



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
##########
@@ -54,14 +54,22 @@ public ControllerPeriodicTask(String taskName, long 
runFrequencyInSeconds, long
     _controllerMetrics = controllerMetrics;
   }
 
+  // Determine if this task can run on the specified table. Task can run on 
all tables for which the controller is lead
+  // if "tablename"property is not set. However, if "tablename" property is 
set (by calling the /periodictask/run

Review comment:
       missing a space between "tablename" and property.

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
##########
@@ -104,4 +105,79 @@ protected void cleanUpTask() {
     assertEquals(numTimesRunCalled.get(), numTasks * 2);
     assertEquals(numTimesStopCalled.get(), numTasks);
   }
+
+
+  /** Test that {@link PeriodicTaskScheduler} does not run the same task more 
than once at any time. */
+  @Test
+  public void testConcurrentExecutionOfSameTask() throws Exception {
+    // Count how many tasks were run.
+    final AtomicInteger counter = new AtomicInteger();
+
+    // Count how many attempts were made to run task
+    final AtomicInteger attempts = new AtomicInteger();
+
+
+    // Create periodic task.
+    PeriodicTask task = new BasePeriodicTask("TestTask", 1L, 0L) {
+      private volatile boolean isRunning = false;
+      @Override
+      protected void runTask() {
+        try {
+          if (isRunning) {
+            Assert.fail("More than one thread attempting to execute task at 
the same time.");
+          }
+          isRunning = true;
+          counter.incrementAndGet();
+          Thread.sleep(250);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        } finally {
+          isRunning = false;
+        }
+      }
+    };
+
+    // Start scheduler with periodic task.
+    List<PeriodicTask> periodicTasks = new ArrayList<>();
+    periodicTasks.add(task);
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+    taskScheduler.start();
+
+    // Create multiple "execute" threads that try to run the same task that is 
already being run by scheduler
+    // on a periodic basis.
+    final int threadCount = 20;
+    Thread[] threads = new Thread[threadCount];
+    for (int i = 0; i < threads.length; i++) {
+      threads[i] = new Thread(() -> {
+          attempts.incrementAndGet();
+          taskScheduler.scheduleNow("TestTask", null);
+      });
+
+      threads[i].start();
+      try {
+        threads[i].join();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    //  Run for 3 seconds to let as many "execute" threads finish as possible.
+    Thread.sleep(3000);

Review comment:
       It'd be good to avoid using sleep method in the test as it will slow 
down the whole build time.

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
##########
@@ -104,4 +105,79 @@ protected void cleanUpTask() {
     assertEquals(numTimesRunCalled.get(), numTasks * 2);
     assertEquals(numTimesStopCalled.get(), numTasks);
   }
+
+
+  /** Test that {@link PeriodicTaskScheduler} does not run the same task more 
than once at any time. */
+  @Test
+  public void testConcurrentExecutionOfSameTask() throws Exception {
+    // Count how many tasks were run.
+    final AtomicInteger counter = new AtomicInteger();
+
+    // Count how many attempts were made to run task
+    final AtomicInteger attempts = new AtomicInteger();
+
+
+    // Create periodic task.
+    PeriodicTask task = new BasePeriodicTask("TestTask", 1L, 0L) {
+      private volatile boolean isRunning = false;
+      @Override
+      protected void runTask() {
+        try {
+          if (isRunning) {
+            Assert.fail("More than one thread attempting to execute task at 
the same time.");
+          }
+          isRunning = true;
+          counter.incrementAndGet();
+          Thread.sleep(250);

Review comment:
       Can this sleep method be skipped?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
##########
@@ -111,22 +126,49 @@ protected void setUpTask() {
    */
   @Override
   public final void run() {
-    _running = true;
+    try {
+      // Don't allow a task to run more than once at a time.
+      _runLock.lock();
+      _running = true;
 
-    if (_started) {
-      long startTime = System.currentTimeMillis();
-      LOGGER.info("Start running task: {}", _taskName);
-      try {
-        runTask();
-      } catch (Exception e) {
-        LOGGER.error("Caught exception while running task: {}", _taskName, e);
+      if (_started) {
+        long startTime = System.currentTimeMillis();
+        LOGGER.info("Start running task: {}", _taskName);
+        try {
+          runTask();
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while running task: {}", _taskName, 
e);
+        }
+        LOGGER.info("Finish running task: {} in {}ms", _taskName, 
System.currentTimeMillis() - startTime);
+      } else {
+        LOGGER.warn("Task: {} is skipped because it is not started or already 
stopped", _taskName);
       }
-      LOGGER.info("Finish running task: {} in {}ms", _taskName, 
System.currentTimeMillis() - startTime);
-    } else {
-      LOGGER.warn("Task: {} is skipped because it is not started or already 
stopped", _taskName);
+
+      _running = false;
+    } finally {
+       _runLock.unlock();
+       _running = false;
     }
+  }
 
-    _running = false;
+  @Override
+  public void run(@Nullable java.util.Properties periodicTaskProperties) {
+    java.util.Properties savedPeriodicTaskProperties = 
_activePeriodicTaskProperties;

Review comment:
       This statement seems unused?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
##########
@@ -108,4 +111,59 @@ public synchronized void stop() {
       _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
     }
   }
+
+  /** @return true if task with given name exists; otherwise, false. */
+  public boolean hasTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /** @return List of tasks name that will run periodically. */
+  public List<String> getTaskNameList() {
+    List<String> taskNameList = new ArrayList<>();
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      taskNameList.add(task.getTaskName());
+    }
+
+    return taskNameList;
+  }
+
+  private PeriodicTask getPeriodicTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  /** Execute specified {@link PeriodicTask} immediately. */

Review comment:
       Format this comment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to