This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b3325c16 Add a config for monitorScheduler type (#10732)
b3325c16 is described below

commit b3325c160171f8a91972b5590c481cb121e58eae
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Jan 13 17:20:43 2021 -0800

    Add a config for monitorScheduler type (#10732)
    
    * Add a config for monitorScheduler type
    
    * check interrupted
    
    * null check
    
    * do not schedule monitor if the previous one is still running
    
    * checkstyle
    
    * clean up names
    
    * change default back to basic
    
    * fix test
---
 .../druid/java/util/metrics/AbstractMonitor.java   |  16 ---
 .../java/util/metrics/BasicMonitorScheduler.java   |  65 +++++++++
 .../metrics/ClockDriftSafeMonitorScheduler.java    | 134 +++++++++++++++++
 .../druid/java/util/metrics/CompoundMonitor.java   |  15 --
 .../apache/druid/java/util/metrics/Monitor.java    |   6 -
 .../druid/java/util/metrics/MonitorScheduler.java  |  83 +++--------
 .../util/metrics/BasicMonitorSchedulerTest.java    | 111 ++++++++++++++
 ...ava => ClockDriftSafeMonitorSchedulerTest.java} | 160 ++++++++++++---------
 .../metrics/DruidMonitorSchedulerConfig.java       |   9 ++
 .../apache/druid/server/metrics/MetricsModule.java |  33 +++--
 .../druid/server/metrics/MetricsModuleTest.java    |  81 +++++++++++
 11 files changed, 536 insertions(+), 177 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
index 4fbefb8..ba35dfb 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
@@ -22,8 +22,6 @@ package org.apache.druid.java.util.metrics;
 
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
-import java.util.concurrent.Future;
-
 
 /**
  */
@@ -31,8 +29,6 @@ public abstract class AbstractMonitor implements Monitor
 {
   private volatile boolean started = false;
   
-  private volatile Future<?> scheduledFuture;
-
   @Override
   public void start()
   {
@@ -56,16 +52,4 @@ public abstract class AbstractMonitor implements Monitor
   }
 
   public abstract boolean doMonitor(ServiceEmitter emitter);
-
-  @Override
-  public Future<?> getScheduledFuture()
-  {
-    return scheduledFuture;
-  }
-
-  @Override
-  public void setScheduledFuture(Future<?> scheduledFuture)
-  {
-    this.scheduledFuture = scheduledFuture;
-  }
 }
diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
 
b/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
new file mode 100644
index 0000000..e3e9572
--- /dev/null
+++ 
b/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors.Signal;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * A {@link MonitorScheduler} implementation based on {@link 
ScheduledExecutorService}.
+ */
+public class BasicMonitorScheduler extends MonitorScheduler
+{
+  private final ScheduledExecutorService exec;
+
+  public BasicMonitorScheduler(
+      MonitorSchedulerConfig config,
+      ServiceEmitter emitter,
+      List<Monitor> monitors,
+      ScheduledExecutorService exec
+  )
+  {
+    super(config, emitter, monitors);
+    this.exec = exec;
+  }
+
+  @Override
+  void startMonitor(Monitor monitor)
+  {
+    monitor.start();
+    ScheduledExecutors.scheduleAtFixedRate(
+        exec,
+        getConfig().getEmitterPeriod(),
+        () -> {
+          // Run one more time even if the monitor was removed, in case 
there's some extra data to flush
+          if (monitor.monitor(getEmitter()) && hasMonitor(monitor)) {
+            return Signal.REPEAT;
+          } else {
+            removeMonitor(monitor);
+            return Signal.STOP;
+          }
+        }
+    );
+  }
+}
diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
 
b/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
new file mode 100644
index 0000000..6fcb244
--- /dev/null
+++ 
b/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link MonitorScheduler} implementation based on {@link CronScheduler}.
+ */
+public class ClockDriftSafeMonitorScheduler extends MonitorScheduler
+{
+  private static final Logger LOG = new 
Logger(ClockDriftSafeMonitorScheduler.class);
+
+  private final CronScheduler monitorScheduler;
+  private final ExecutorService monitorRunner;
+
+  public ClockDriftSafeMonitorScheduler(
+      MonitorSchedulerConfig config,
+      ServiceEmitter emitter,
+      List<Monitor> monitors,
+      CronScheduler monitorScheduler,
+      ExecutorService monitorRunner
+  )
+  {
+    super(config, emitter, monitors);
+    this.monitorScheduler = monitorScheduler;
+    this.monitorRunner = monitorRunner;
+  }
+
+  @Override
+  void startMonitor(final Monitor monitor)
+  {
+    monitor.start();
+    long rate = getConfig().getEmitterPeriod().getMillis();
+    final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
+    Future<?> future = monitorScheduler.scheduleAtFixedRate(
+        rate,
+        rate,
+        TimeUnit.MILLISECONDS,
+        new CronTask()
+        {
+          private Future<?> cancellationFuture = null;
+          private Future<Boolean> monitorFuture = null;
+
+          @Override
+          public void run(long scheduledRunTimeMillis)
+          {
+            waitForScheduleFutureToBeSet();
+            if (cancellationFuture == null) {
+              LOG.error("scheduleFuture is not set. Can't run monitor[%s]", 
monitor.getClass().getName());
+              return;
+            }
+            try {
+              // Do nothing if the monitor is still running.
+              if (monitorFuture == null || monitorFuture.isDone()) {
+                if (monitorFuture != null) {
+                  // monitorFuture must be done at this moment if it's not null
+                  if (!(monitorFuture.get() && hasMonitor(monitor))) {
+                    stopMonitor(monitor);
+                    return;
+                  }
+                }
+
+                LOG.trace("Running monitor[%s]", monitor.getClass().getName());
+                monitorFuture = monitorRunner.submit(() -> {
+                  try {
+                    return monitor.monitor(getEmitter());
+                  }
+                  catch (Throwable e) {
+                    LOG.error(
+                        e,
+                        "Exception while executing monitor[%s]. Rescheduling 
in %s ms",
+                        monitor.getClass().getName(),
+                        rate
+                    );
+                    return Boolean.TRUE;
+                  }
+                });
+              }
+            }
+            catch (Throwable e) {
+              LOG.error(e, "Uncaught exception.");
+            }
+          }
+
+          private void waitForScheduleFutureToBeSet()
+          {
+            if (cancellationFuture == null) {
+              while (!Thread.currentThread().isInterrupted()) {
+                if (futureReference.get() != null) {
+                  cancellationFuture = futureReference.get();
+                  break;
+                }
+              }
+            }
+          }
+
+          private void stopMonitor(Monitor monitor)
+          {
+            removeMonitor(monitor);
+            cancellationFuture.cancel(false);
+            LOG.debug("Stopped monitor[%s]", monitor.getClass().getName());
+          }
+        }
+    );
+    futureReference.set(future);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
index 6649312..676c77a 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
@@ -24,14 +24,11 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.Future;
 
 public abstract class CompoundMonitor implements Monitor
 {
   private final List<Monitor> monitors;
   
-  private volatile Future<?> scheduledFuture;
-
   public CompoundMonitor(List<Monitor> monitors)
   {
     this.monitors = monitors;
@@ -64,17 +61,5 @@ public abstract class CompoundMonitor implements Monitor
     return shouldReschedule(Lists.transform(monitors, monitor -> 
monitor.monitor(emitter)));
   }
 
-  @Override
-  public Future<?> getScheduledFuture()
-  {
-    return scheduledFuture;
-  }
-
-  @Override
-  public void setScheduledFuture(Future<?> scheduledFuture)
-  {
-    this.scheduledFuture = scheduledFuture;
-  }
-
   public abstract boolean shouldReschedule(List<Boolean> reschedules);
 }
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
index 8a3975e..8ddb3fa 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
@@ -21,8 +21,6 @@ package org.apache.druid.java.util.metrics;
 
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
-import java.util.concurrent.Future;
-
 
 /**
  */
@@ -38,8 +36,4 @@ public interface Monitor
    * @return true if this monitor needs to continue monitoring. False 
otherwise.
    */
   boolean monitor(ServiceEmitter emitter);
-
-  Future<?> getScheduledFuture();
-  
-  void setScheduledFuture(Future<?> scheduledFuture);
 }
diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 961f823..171ca40 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -20,52 +20,37 @@
 package org.apache.druid.java.util.metrics;
 
 import com.google.common.collect.Sets;
-import io.timeandspace.cronscheduler.CronScheduler;
-import io.timeandspace.cronscheduler.CronTask;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 
 /**
  */
-public class MonitorScheduler
+public abstract class MonitorScheduler
 {
-  
-  private static final Logger log = new Logger(MonitorScheduler.class);
-  
   private final MonitorSchedulerConfig config;
   private final ServiceEmitter emitter;
   private final Set<Monitor> monitors;
   private final Object lock = new Object();
-  private final CronScheduler scheduler;
-  private final ExecutorService executorService;
 
   private volatile boolean started = false;
   
-  public MonitorScheduler(
+  MonitorScheduler(
       MonitorSchedulerConfig config,
-      CronScheduler scheduler,
       ServiceEmitter emitter,
-      List<Monitor> monitors,
-      ExecutorService executorService
+      List<Monitor> monitors
   )
   {
     this.config = config;
-    this.scheduler = scheduler;
     this.emitter = emitter;
     this.monitors = Sets.newHashSet(monitors);
-    this.executorService = executorService;
   }
 
   @LifecycleStart
@@ -131,59 +116,23 @@ public class MonitorScheduler
     }
   }
 
-  private void startMonitor(final Monitor monitor)
+  boolean hasMonitor(final Monitor monitor)
   {
     synchronized (lock) {
-      monitor.start();
-      long rate = config.getEmitterPeriod().getMillis();
-      Future<?> scheduledFuture = scheduler.scheduleAtFixedRate(
-          rate,
-          rate,
-          TimeUnit.MILLISECONDS,
-          new CronTask()
-          {
-            private volatile Future<Boolean> monitorFuture = null;
-            @Override
-            public void run(long scheduledRunTimeMillis)
-            {
-              try {
-                if (monitorFuture != null && monitorFuture.isDone()
-                    && !(monitorFuture.get() && hasMonitor(monitor))) {
-                  removeMonitor(monitor);
-                  monitor.getScheduledFuture().cancel(false);
-                  log.debug("Stopped rescheduling %s (delay %s)", this, rate);
-                  return;
-                }
-                log.trace("Running %s (period %s)", this, rate);
-                monitorFuture = executorService.submit(new Callable<Boolean>()
-                {
-                  @Override
-                  public Boolean call()
-                  {
-                    try {
-                      return monitor.monitor(emitter);
-                    }
-                    catch (Throwable e) {
-                      log.error(e, "Uncaught exception.");
-                      return Boolean.FALSE;
-                    }
-                  }
-                });
-              }
-              catch (Throwable e) {
-                log.error(e, "Uncaught exception.");
-              }
-            }
-          });
-      monitor.setScheduledFuture(scheduledFuture);
+      return monitors.contains(monitor);
     }
   }
 
-  private boolean hasMonitor(final Monitor monitor)
+  MonitorSchedulerConfig getConfig()
   {
-    synchronized (lock) {
-      return monitors.contains(monitor);
-    }
+    return config;
   }
-  
+
+  ServiceEmitter getEmitter()
+  {
+    return emitter;
+  }
+
+  @GuardedBy("lock")
+  abstract void startMonitor(Monitor monitor);
 }
diff --git 
a/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
 
b/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
new file mode 100644
index 0000000..ea04080
--- /dev/null
+++ 
b/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+public class BasicMonitorSchedulerTest
+{
+  private final MonitorSchedulerConfig config = new MonitorSchedulerConfig()
+  {
+    @Override
+    public Duration getEmitterPeriod()
+    {
+      return Duration.millis(5);
+    }
+  };
+  private ServiceEmitter emitter;
+  private ScheduledExecutorService exec;
+
+  @Before
+  public void setup()
+  {
+    emitter = Mockito.mock(ServiceEmitter.class);
+    exec = Execs.scheduledSingleThreaded("BasicMonitorSchedulerTest");
+  }
+
+  @Test
+  public void testStart_RepeatScheduling() throws InterruptedException
+  {
+    final Monitor monitor = Mockito.mock(Monitor.class);
+    Mockito.when(monitor.monitor(ArgumentMatchers.any())).thenReturn(true);
+
+    final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
+        config,
+        emitter,
+        ImmutableList.of(monitor),
+        exec
+    );
+    scheduler.start();
+    Thread.sleep(100);
+    Mockito.verify(monitor, 
Mockito.atLeast(2)).monitor(ArgumentMatchers.any());
+    scheduler.stop();
+  }
+
+  @Test
+  public void testStart_RepeatAndStopScheduling() throws InterruptedException
+  {
+    final Monitor monitor = Mockito.mock(Monitor.class);
+    Mockito.when(monitor.monitor(ArgumentMatchers.any())).thenReturn(true, 
true, true, false);
+
+    final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
+        config,
+        emitter,
+        ImmutableList.of(monitor),
+        exec
+    );
+    scheduler.start();
+    Thread.sleep(100);
+    // monitor.monitor() is called 5 times since a new task is scheduled first 
and then the current one is executed.
+    // See ScheduledExecutors.scheduleAtFixedRate() for details.
+    Mockito.verify(monitor, Mockito.times(5)).monitor(ArgumentMatchers.any());
+    scheduler.stop();
+  }
+
+  @Test
+  public void testStart_UnexpectedExceptionWhileMonitoring_ContinueMonitor() 
throws InterruptedException
+  {
+    final Monitor monitor = Mockito.mock(Monitor.class);
+    Mockito.when(monitor.monitor(ArgumentMatchers.any()))
+           .thenThrow(new RuntimeException("Test throwing exception while 
monitoring"));
+
+    final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
+        config,
+        emitter,
+        ImmutableList.of(monitor),
+        exec
+    );
+    scheduler.start();
+    Thread.sleep(100);
+    // monitor.monitor() is called 5 times since a new task is scheduled first 
and then the current one is executed.
+    // See ScheduledExecutors.scheduleAtFixedRate() for details.
+    Mockito.verify(monitor, 
Mockito.atLeast(2)).monitor(ArgumentMatchers.any());
+    scheduler.stop();
+  }
+}
diff --git 
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
 
b/core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
similarity index 73%
rename from 
core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
rename to 
core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
index da2ba59..b5e80a2 100644
--- 
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
+++ 
b/core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
@@ -23,7 +23,9 @@ package org.apache.druid.java.util.metrics;
 import com.google.common.collect.ImmutableList;
 import io.timeandspace.cronscheduler.CronScheduler;
 import io.timeandspace.cronscheduler.CronTask;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,21 +40,37 @@ import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-public class MonitorSchedulerTest
+public class ClockDriftSafeMonitorSchedulerTest
 {
-  
+  // A real executor service to execute CronTask asynchronously.
+  // Many tests in this class use mocks to easily control the behavior of 
CronScheduler and the ExecutorService
+  // used by MonitorScheduler. However, as MonitorScheduler uses two differnt 
threads in production, one for
+  // scheduling a task to schedule a monitor (CronScheduler), and another for 
running a scheduled monitor
+  // asynchronously, these tests also require to run some tasks in an 
asynchronous manner. As mocks are convenient
+  // enough to control the behavior of things, we use another executorService 
only to run some tasks asynchronously
+  // to mimic the nature of asynchronous execution in MonitorScheduler.
+  private ExecutorService cronTaskRunner;
   @Mock
   private CronScheduler cronScheduler;
   
   @Before
   public void setUp()
   {
+    cronTaskRunner = Execs.singleThreaded("monitor-scheduler-test");
     MockitoAnnotations.initMocks(this);
   }
+
+  @After
+  public void tearDown()
+  {
+    cronTaskRunner.shutdownNow();
+  }
   
   @Test
   public void testFindMonitor()
@@ -72,11 +90,11 @@ public class MonitorSchedulerTest
     
     ExecutorService executor = Mockito.mock(ExecutorService.class);
 
-    final MonitorScheduler scheduler = new MonitorScheduler(
+    final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
         Mockito.mock(MonitorSchedulerConfig.class),
-        
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
         Mockito.mock(ServiceEmitter.class),
         ImmutableList.of(monitor1, monitor2),
+        
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
         executor
     );
 
@@ -91,17 +109,18 @@ public class MonitorSchedulerTest
   }
   
   @Test
-  public void testStart_RepeatScheduling() 
+  public void testStart_RepeatScheduling() throws InterruptedException
   {
     ExecutorService executor = Mockito.mock(ExecutorService.class);
 
+    CountDownLatch latch = new CountDownLatch(1);
     Mockito.doAnswer(new Answer<Future<?>>()
     {
       private int scheduleCount = 0;
 
       @SuppressWarnings("unchecked")
       @Override
-      public Future<?> answer(InvocationOnMock invocation) throws Exception
+      public Future<?> answer(InvocationOnMock invocation)
       {
         final Object originalArgument = (invocation.getArguments())[3];
         CronTask task = ((CronTask) originalArgument);
@@ -117,10 +136,14 @@ public class MonitorSchedulerTest
           }
         }).when(executor).submit(ArgumentMatchers.any(Callable.class));
 
-        while (scheduleCount < 2) {
-          scheduleCount++;
-          task.run(123L);
-        }
+        cronTaskRunner.submit(() -> {
+          while (scheduleCount < 2) {
+            scheduleCount++;
+            task.run(123L);
+          }
+          latch.countDown();
+          return null;
+        });
         return createDummyFuture();
       }
     }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyLong(),
@@ -131,13 +154,15 @@ public class MonitorSchedulerTest
     MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
     Mockito.when(config.getEmitterPeriod()).thenReturn(new 
org.joda.time.Duration(1000L));
 
-    final MonitorScheduler scheduler = new MonitorScheduler(
+    final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
         config,
-        cronScheduler,
         Mockito.mock(ServiceEmitter.class),
         ImmutableList.of(monitor),
-        executor);
+        cronScheduler,
+        executor
+    );
     scheduler.start();
+    latch.await(5, TimeUnit.SECONDS);
 
     Mockito.verify(monitor, Mockito.times(1)).start();
     Mockito.verify(cronScheduler, 
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
@@ -145,21 +170,22 @@ public class MonitorSchedulerTest
         ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
     Mockito.verify(executor, 
Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class));
     Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
-
+    scheduler.stop();
   }
   
   @Test
-  public void testStart_RepeatAndStopScheduling() 
+  public void testStart_RepeatAndStopScheduling() throws InterruptedException
   {
     ExecutorService executor = Mockito.mock(ExecutorService.class);
 
+    CountDownLatch latch = new CountDownLatch(1);
     Mockito.doAnswer(new Answer<Future<?>>()
     {
       private int scheduleCount = 0;
 
       @SuppressWarnings("unchecked")
       @Override
-      public Future<?> answer(InvocationOnMock invocation) throws Exception
+      public Future<?> answer(InvocationOnMock invocation)
       {
         final Object originalArgument = (invocation.getArguments())[3];
         CronTask task = ((CronTask) originalArgument);
@@ -174,29 +200,34 @@ public class MonitorSchedulerTest
           }
         }).when(executor).submit(ArgumentMatchers.any(Callable.class));
 
-        while (scheduleCount < 2) {
-          scheduleCount++;
-          task.run(123L);
-        }
+        cronTaskRunner.submit(() -> {
+          while (scheduleCount < 2) {
+            scheduleCount++;
+            task.run(123L);
+          }
+          latch.countDown();
+          return null;
+        });
         return createDummyFuture();
       }
     }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyLong(),
         ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
 
     Monitor monitor = Mockito.mock(Monitor.class);
-    Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
 
     MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
     Mockito.when(config.getEmitterPeriod()).thenReturn(new 
org.joda.time.Duration(1000L));
 
-    final MonitorScheduler scheduler = new MonitorScheduler(
+    final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
         config,
-        cronScheduler,
         Mockito.mock(ServiceEmitter.class),
         ImmutableList.of(monitor),
-        executor);
+        cronScheduler,
+        executor
+    );
     scheduler.start();
-    
+    latch.await(5, TimeUnit.SECONDS);
+
     Mockito.verify(monitor, Mockito.times(1)).start();
     Mockito.verify(cronScheduler, 
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
         ArgumentMatchers.anyLong(),
@@ -204,26 +235,27 @@ public class MonitorSchedulerTest
     Mockito.verify(executor, 
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
     Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
     Mockito.verify(monitor, Mockito.times(1)).stop();
-    
+    scheduler.stop();
   }
   
   @Test
-  public void testStart_UnexpectedExceptionWhileMonitoring() 
+  public void testStart_UnexpectedExceptionWhileMonitoring() throws 
InterruptedException
   {
     ExecutorService executor = Mockito.mock(ExecutorService.class);
     Monitor monitor = Mockito.mock(Monitor.class);
-    Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
-    
Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new
 RuntimeException());
+    Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class)))
+           .thenThrow(new RuntimeException("Test throwing exception while 
monitoring"));
 
     MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
     Mockito.when(config.getEmitterPeriod()).thenReturn(new 
org.joda.time.Duration(1000L));
 
-
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicBoolean monitorResultHolder = new AtomicBoolean(false);
     Mockito.doAnswer(new Answer<Future<?>>()
     {
       @SuppressWarnings("unchecked")
       @Override
-      public Future<?> answer(InvocationOnMock invocation) throws Exception
+      public Future<?> answer(InvocationOnMock invocation)
       {
         final Object originalArgument = (invocation.getArguments())[3];
         CronTask task = ((CronTask) originalArgument);
@@ -234,78 +266,92 @@ public class MonitorSchedulerTest
           public Future<Boolean> answer(InvocationOnMock invocation) throws 
Exception
           {
             final Object originalArgument = (invocation.getArguments())[0];
-            ((Callable<Boolean>) originalArgument).call();
-            return CompletableFuture.completedFuture(Boolean.TRUE);
+            final boolean continueMonitor = ((Callable<Boolean>) 
originalArgument).call();
+            monitorResultHolder.set(continueMonitor);
+            return CompletableFuture.completedFuture(continueMonitor);
           }
         }).when(executor).submit(ArgumentMatchers.any(Callable.class));
 
-        task.run(123L);
+        cronTaskRunner.submit(() -> {
+          task.run(123L);
+          latch.countDown();
+          return null;
+        });
         return createDummyFuture();
       }
     }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyLong(),
         ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
 
     
-    final MonitorScheduler scheduler = new MonitorScheduler(
+    final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
         config,
-        cronScheduler,
         Mockito.mock(ServiceEmitter.class),
         ImmutableList.of(monitor),
-        executor);
+        cronScheduler,
+        executor
+    );
     scheduler.start();
-    
+    latch.await(5, TimeUnit.SECONDS);
+
     Mockito.verify(monitor, Mockito.times(1)).start();
     Mockito.verify(cronScheduler, 
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
         ArgumentMatchers.anyLong(),
         ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
     Mockito.verify(executor, 
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
     Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+    Assert.assertTrue(monitorResultHolder.get());
+    scheduler.stop();
   }
   
-  
   @Test
-  public void testStart_UnexpectedExceptionWhileScheduling() 
+  public void testStart_UnexpectedExceptionWhileScheduling() throws 
InterruptedException
   {
     ExecutorService executor = Mockito.mock(ExecutorService.class);
     Monitor monitor = Mockito.mock(Monitor.class);
-    Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
     MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
     Mockito.when(config.getEmitterPeriod()).thenReturn(new 
org.joda.time.Duration(1000L));
 
-
+    CountDownLatch latch = new CountDownLatch(1);
     Mockito.doAnswer(new Answer<Future<?>>()
     {
       @SuppressWarnings("unchecked")
       @Override
-      public Future<?> answer(InvocationOnMock invocation) throws Exception
+      public Future<?> answer(InvocationOnMock invocation)
       {
         final Object originalArgument = (invocation.getArguments())[3];
         CronTask task = ((CronTask) originalArgument);
 
-        
Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new
 RuntimeException());
-        task.run(123L);
+        Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class)))
+               .thenThrow(new RuntimeException("Test throwing exception while 
scheduling"));
+        cronTaskRunner.submit(() -> {
+          task.run(123L);
+          latch.countDown();
+          return null;
+        });
         return createDummyFuture();
       }
     }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyLong(),
         ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
 
     
-    final MonitorScheduler scheduler = new MonitorScheduler(
+    final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
         config,
-        cronScheduler,
         Mockito.mock(ServiceEmitter.class),
         ImmutableList.of(monitor),
-        executor);
+        cronScheduler,
+        executor
+    );
     scheduler.start();
-    
+    latch.await(5, TimeUnit.SECONDS);
+
     Mockito.verify(monitor, Mockito.times(1)).start();
     Mockito.verify(cronScheduler, 
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
         ArgumentMatchers.anyLong(),
         ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
     Mockito.verify(executor, 
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+    scheduler.stop();
   }
   
-  
   private Future createDummyFuture()
   {
     Future<?> future = new Future()
@@ -366,19 +412,5 @@ public class MonitorSchedulerTest
     {
       return true;
     }
-
-    @Override
-    public Future<?> getScheduledFuture()
-    {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public void setScheduledFuture(Future<?> scheduledFuture)
-    {
-      // TODO Auto-generated method stub
-      
-    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
 
b/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
index 228360f..0e242b1 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.metrics;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
 import org.apache.druid.java.util.metrics.MonitorSchedulerConfig;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -29,8 +30,16 @@ import org.joda.time.Period;
 public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig
 {
   @JsonProperty
+  private String schedulerClassName = BasicMonitorScheduler.class.getName();
+
+  @JsonProperty
   private Period emissionPeriod = new Period("PT1M");
 
+  public String getSchedulerClassName()
+  {
+    return schedulerClassName;
+  }
+
   @JsonProperty
   public Period getEmissionPeriod()
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java 
b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index d7598ae..df6fe8b 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -32,9 +32,12 @@ import org.apache.druid.guice.DruidBinders;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
+import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
 import org.apache.druid.java.util.metrics.JvmCpuMonitor;
 import org.apache.druid.java.util.metrics.JvmMonitor;
 import org.apache.druid.java.util.metrics.JvmThreadsMonitor;
@@ -56,6 +59,7 @@ import java.util.stream.Collectors;
  */
 public class MetricsModule implements Module
 {
+  static final String MONITORING_PROPERTY_PREFIX = "druid.monitoring";
   private static final Logger log = new Logger(MetricsModule.class);
 
   public static void register(Binder binder, Class<? extends Monitor> 
monitorClazz)
@@ -66,8 +70,8 @@ public class MetricsModule implements Module
   @Override
   public void configure(Binder binder)
   {
-    JsonConfigProvider.bind(binder, "druid.monitoring", 
DruidMonitorSchedulerConfig.class);
-    JsonConfigProvider.bind(binder, "druid.monitoring", MonitorsConfig.class);
+    JsonConfigProvider.bind(binder, MONITORING_PROPERTY_PREFIX, 
DruidMonitorSchedulerConfig.class);
+    JsonConfigProvider.bind(binder, MONITORING_PROPERTY_PREFIX, 
MonitorsConfig.class);
 
     DruidBinders.metricMonitorBinder(binder); // get the binder so that it 
will inject the empty set at a minimum.
 
@@ -106,13 +110,24 @@ public class MetricsModule implements Module
       );
     }
 
-    return new MonitorScheduler(
-        config.get(),
-        
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
-        emitter,
-        monitors,
-        Execs.multiThreaded(64, "MonitorThread-%d")
-    );
+    if 
(ClockDriftSafeMonitorScheduler.class.getName().equals(config.get().getSchedulerClassName()))
 {
+      return new ClockDriftSafeMonitorScheduler(
+          config.get(),
+          emitter,
+          monitors,
+          
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler").build(),
+          Execs.singleThreaded("MonitorRunner")
+      );
+    } else if 
(BasicMonitorScheduler.class.getName().equals(config.get().getSchedulerClassName()))
 {
+      return new BasicMonitorScheduler(
+          config.get(),
+          emitter,
+          monitors,
+          Execs.scheduledSingleThreaded("MonitorScheduler-%s")
+      );
+    } else {
+      throw new IAE("Unknown monitor scheduler[%s]", 
config.get().getSchedulerClassName());
+    }
   }
 
   @Provides
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java 
b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
index a94d6cc..cba4f84 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
@@ -21,20 +21,41 @@ package org.apache.druid.server.metrics;
 
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Binder;
+import com.google.inject.CreationException;
+import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Module;
+import com.google.inject.Scopes;
 import com.google.inject.name.Names;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.initialization.Initialization;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
+import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
+import org.apache.druid.java.util.metrics.MonitorScheduler;
 import org.apache.druid.server.DruidNode;
+import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import java.util.Properties;
 
 public class MetricsModuleTest
 {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testSimpleInjection()
   {
@@ -88,4 +109,64 @@ public class MetricsModuleTest
     Assert.assertEquals(dataSource, dimensionIdHolder.getDataSource());
     Assert.assertEquals(taskId, dimensionIdHolder.getTaskId());
   }
+
+  @Test
+  public void testGetBasicMonitorSchedulerByDefault()
+  {
+    final MonitorScheduler monitorScheduler = createInjector(new 
Properties()).getInstance(MonitorScheduler.class);
+    Assert.assertSame(BasicMonitorScheduler.class, 
monitorScheduler.getClass());
+  }
+
+  @Test
+  public void testGetClockDriftSafeMonitorSchedulerViaConfig()
+  {
+    final Properties properties = new Properties();
+    properties.setProperty(
+        StringUtils.format("%s.schedulerClassName", 
MetricsModule.MONITORING_PROPERTY_PREFIX),
+        ClockDriftSafeMonitorScheduler.class.getName()
+    );
+    final MonitorScheduler monitorScheduler = 
createInjector(properties).getInstance(MonitorScheduler.class);
+    Assert.assertSame(ClockDriftSafeMonitorScheduler.class, 
monitorScheduler.getClass());
+  }
+
+  @Test
+  public void testGetBasicMonitorSchedulerViaConfig()
+  {
+    final Properties properties = new Properties();
+    properties.setProperty(
+        StringUtils.format("%s.schedulerClassName", 
MetricsModule.MONITORING_PROPERTY_PREFIX),
+        BasicMonitorScheduler.class.getName()
+    );
+    final MonitorScheduler monitorScheduler = 
createInjector(properties).getInstance(MonitorScheduler.class);
+    Assert.assertSame(BasicMonitorScheduler.class, 
monitorScheduler.getClass());
+  }
+
+  @Test
+  public void testGetMonitorSchedulerUnknownSchedulerException()
+  {
+    final Properties properties = new Properties();
+    properties.setProperty(
+        StringUtils.format("%s.schedulerClassName", 
MetricsModule.MONITORING_PROPERTY_PREFIX),
+        "UnknownScheduler"
+    );
+    expectedException.expect(CreationException.class);
+    
expectedException.expectCause(CoreMatchers.instanceOf(IllegalArgumentException.class));
+    expectedException.expectMessage("Unknown monitor 
scheduler[UnknownScheduler]");
+    createInjector(properties).getInstance(MonitorScheduler.class);
+  }
+
+  private static Injector createInjector(Properties properties)
+  {
+    return Guice.createInjector(
+        new JacksonModule(),
+        new LifecycleModule(),
+        binder -> {
+          
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+          binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+          binder.bind(ServiceEmitter.class).toInstance(new 
NoopServiceEmitter());
+          binder.bind(Properties.class).toInstance(properties);
+        },
+        new MetricsModule()
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to