jackjlli commented on a change in pull request #3819: Refactor periodic task
URL: https://github.com/apache/incubator-pinot/pull/3819#discussion_r255818544
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
##########
@@ -42,9 +61,125 @@ public long getInitialDelayInSeconds() {
return _initialDelayInSeconds;
}
+ /**
+ * Returns the status of the {@code started} flag. This flag will be set
after calling {@link #start()}, and reset
+ * after calling {@link #stop()}.
+ */
+ public final boolean isStarted() {
+ return _started;
+ }
+
+ /**
+ * Returns the status of the {@code running} flag. This flag will be set
during the task execution.
+ */
+ public final boolean isRunning() {
+ return _running;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method sets {@code started} flag to true.
+ */
@Override
- public String getTaskName() {
- return _taskName;
+ public final synchronized void start() {
+ if (_started) {
+ LOGGER.warn("Task: {} is already started", _taskName);
+ return;
+ }
+ _started = true;
+
+ try {
+ startTask();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while setting up task: {}", _taskName, e);
+ }
+ }
+
+ /**
+ * Can be override for extra task setups.
+ */
+ protected void startTask() {
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * During the task execution, the {@code running} flag will be set.
+ */
+ @Override
+ public final void run() {
+ _running = true;
+
+ if (_started) {
+ long startTime = System.currentTimeMillis();
+ LOGGER.info("Start running task: {}", _taskName);
+ try {
+ runTask();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while running task: {}", _taskName, e);
+ }
+ LOGGER.info("Finish running task: {} in {}ms", _taskName,
System.currentTimeMillis() - startTime);
+ } else {
+ LOGGER.warn("Task: {} is skipped because it is not started or already
stopped", _taskName);
+ }
+
+ _running = false;
+ }
+
+ /**
+ * Executes the task. This method should early terminate if {@link}
+ */
+ protected abstract void runTask();
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method sets {@code started} flag to false. If the task is running,
this method will block for at most 5
+ * minutes until the task finishes.
+ */
+ @Override
+ public final synchronized void stop() {
+ if (!_started) {
+ LOGGER.warn("Task: {} is not started", _taskName);
+ return;
+ }
+ _started = false;
+
+ if (_running) {
+ long startTimeMs = System.currentTimeMillis();
+ long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
+ LOGGER.info("Task: {} is running, wait for at most {}ms for it to
finish", _taskName, remainingTimeMs);
+ while (_running && remainingTimeMs > 0L) {
+ long sleepTimeMs = Long.max(remainingTimeMs, 1000L);
Review comment:
I'm not quite sure the logic here. The variable `remainingTimeMs` is
initially set to 30_000L. Then `sleepTimeMs` is set to 30_000L, since it's
assigned from the larger number between 30_000L and 1_000L. Then
`remainingTimeMs` is set to 0L. In this case, the thread will be slept only
once. Could it be `min()` instead of `max()`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]