This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e78c8369044 Change BasePeriodicTask to use a lock object for start / 
stop rather than synchronizing the methods (#16613)
e78c8369044 is described below

commit e78c83690447e7e79d7f5daf27d24bf6caca11a0
Author: Sonam Mandal <[email protected]>
AuthorDate: Fri Aug 15 19:01:18 2025 -0700

    Change BasePeriodicTask to use a lock object for start / stop rather than 
synchronizing the methods (#16613)
---
 .../pinot/core/periodictask/BasePeriodicTask.java  | 87 ++++++++++++----------
 1 file changed, 48 insertions(+), 39 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index 5179d9a6778..6ef79b616ea 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -42,6 +42,9 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
   protected final long _initialDelayInSeconds;
   protected final ReentrantLock _runLock;
 
+  // Lock used to synchronize life-cycle functions
+  private final Object _lifeCycleLock;
+
   private volatile boolean _started;
   private volatile boolean _running;
 
@@ -61,6 +64,7 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
     _intervalInSeconds = runFrequencyInSeconds;
     _initialDelayInSeconds = initialDelayInSeconds;
     _runLock = new ReentrantLock();
+    _lifeCycleLock = new Object();
   }
 
   @Override
@@ -99,21 +103,23 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
    * This method sets {@code started} flag to true.
    */
   @Override
-  public final synchronized void start() {
-    if (_started) {
-      LOGGER.warn("Task: {} is already started", _taskName);
-      return;
-    }
+  public final void start() {
+    synchronized (_lifeCycleLock) {
+      if (_started) {
+        LOGGER.warn("Task: {} is already started", _taskName);
+        return;
+      }
 
-    try {
-      setUpTask();
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while setting up task: {}", _taskName, e);
-    }
+      try {
+        setUpTask();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while setting up task: {}", _taskName, 
e);
+      }
 
-    // mark _started as true only after state has completely initialized, so 
that run method doesn't end up seeing
-    // partially initialized state.
-    _started = true;
+      // mark _started as true only after state has completely initialized, so 
that run method doesn't end up seeing
+      // partially initialized state.
+      _started = true;
+    }
   }
 
   /**
@@ -177,35 +183,38 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
    * seconds until the task finishes.
    */
   @Override
-  public final synchronized void stop() {
-    if (!_started) {
-      LOGGER.warn("Task: {} is not started", _taskName);
-      return;
-    }
-    long startTimeMs = System.currentTimeMillis();
-    _started = false;
-
-    try {
-      // check if task is done running, or wait for the task to get done, by 
trying to acquire runLock.
-      if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, 
TimeUnit.MILLISECONDS)) {
-        LOGGER
-            .warn("Task {} could not be stopped within timeout of {}ms", 
_taskName, MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
-      } else {
-        LOGGER.info("Task {} successfully stopped in {}ms", _taskName, 
System.currentTimeMillis() - startTimeMs);
+  public final void stop() {
+    synchronized (_lifeCycleLock) {
+      if (!_started) {
+        LOGGER.warn("Task: {} is not started", _taskName);
+        return;
       }
-    } catch (InterruptedException ie) {
-      LOGGER.error("Caught InterruptedException while waiting for task: {} to 
finish", _taskName);
-      Thread.currentThread().interrupt();
-    } finally {
-      if (_runLock.isHeldByCurrentThread()) {
-        _runLock.unlock();
+      long startTimeMs = System.currentTimeMillis();
+      _started = false;
+
+      try {
+        // check if task is done running, or wait for the task to get done, by 
trying to acquire runLock.
+        if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, 
TimeUnit.MILLISECONDS)) {
+          LOGGER
+              .warn("Task {} could not be stopped within timeout of {}ms", 
_taskName,
+                  MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
+        } else {
+          LOGGER.info("Task {} successfully stopped in {}ms", _taskName, 
System.currentTimeMillis() - startTimeMs);
+        }
+      } catch (InterruptedException ie) {
+        LOGGER.error("Caught InterruptedException while waiting for task: {} 
to finish", _taskName);
+        Thread.currentThread().interrupt();
+      } finally {
+        if (_runLock.isHeldByCurrentThread()) {
+          _runLock.unlock();
+        }
       }
-    }
 
-    try {
-      cleanUpTask();
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while cleaning up task: {}", _taskName, 
e);
+      try {
+        cleanUpTask();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while cleaning up task: {}", _taskName, 
e);
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to