This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b3325c16 Add a config for monitorScheduler type (#10732)
b3325c16 is described below
commit b3325c160171f8a91972b5590c481cb121e58eae
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Jan 13 17:20:43 2021 -0800
Add a config for monitorScheduler type (#10732)
* Add a config for monitorScheduler type
* check interrupted
* null check
* do not schedule monitor if the previous one is still running
* checkstyle
* clean up names
* change default back to basic
* fix test
---
.../druid/java/util/metrics/AbstractMonitor.java | 16 ---
.../java/util/metrics/BasicMonitorScheduler.java | 65 +++++++++
.../metrics/ClockDriftSafeMonitorScheduler.java | 134 +++++++++++++++++
.../druid/java/util/metrics/CompoundMonitor.java | 15 --
.../apache/druid/java/util/metrics/Monitor.java | 6 -
.../druid/java/util/metrics/MonitorScheduler.java | 83 +++--------
.../util/metrics/BasicMonitorSchedulerTest.java | 111 ++++++++++++++
...ava => ClockDriftSafeMonitorSchedulerTest.java} | 160 ++++++++++++---------
.../metrics/DruidMonitorSchedulerConfig.java | 9 ++
.../apache/druid/server/metrics/MetricsModule.java | 33 +++--
.../druid/server/metrics/MetricsModuleTest.java | 81 +++++++++++
11 files changed, 536 insertions(+), 177 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
index 4fbefb8..ba35dfb 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
@@ -22,8 +22,6 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import java.util.concurrent.Future;
-
/**
*/
@@ -31,8 +29,6 @@ public abstract class AbstractMonitor implements Monitor
{
private volatile boolean started = false;
- private volatile Future<?> scheduledFuture;
-
@Override
public void start()
{
@@ -56,16 +52,4 @@ public abstract class AbstractMonitor implements Monitor
}
public abstract boolean doMonitor(ServiceEmitter emitter);
-
- @Override
- public Future<?> getScheduledFuture()
- {
- return scheduledFuture;
- }
-
- @Override
- public void setScheduledFuture(Future<?> scheduledFuture)
- {
- this.scheduledFuture = scheduledFuture;
- }
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
b/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
new file mode 100644
index 0000000..e3e9572
--- /dev/null
+++
b/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors.Signal;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * A {@link MonitorScheduler} implementation based on {@link
ScheduledExecutorService}.
+ */
+public class BasicMonitorScheduler extends MonitorScheduler
+{
+ private final ScheduledExecutorService exec;
+
+ public BasicMonitorScheduler(
+ MonitorSchedulerConfig config,
+ ServiceEmitter emitter,
+ List<Monitor> monitors,
+ ScheduledExecutorService exec
+ )
+ {
+ super(config, emitter, monitors);
+ this.exec = exec;
+ }
+
+ @Override
+ void startMonitor(Monitor monitor)
+ {
+ monitor.start();
+ ScheduledExecutors.scheduleAtFixedRate(
+ exec,
+ getConfig().getEmitterPeriod(),
+ () -> {
+ // Run one more time even if the monitor was removed, in case
there's some extra data to flush
+ if (monitor.monitor(getEmitter()) && hasMonitor(monitor)) {
+ return Signal.REPEAT;
+ } else {
+ removeMonitor(monitor);
+ return Signal.STOP;
+ }
+ }
+ );
+ }
+}
diff --git
a/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
b/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
new file mode 100644
index 0000000..6fcb244
--- /dev/null
+++
b/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link MonitorScheduler} implementation based on {@link CronScheduler}.
+ */
+public class ClockDriftSafeMonitorScheduler extends MonitorScheduler
+{
+ private static final Logger LOG = new
Logger(ClockDriftSafeMonitorScheduler.class);
+
+ private final CronScheduler monitorScheduler;
+ private final ExecutorService monitorRunner;
+
+ public ClockDriftSafeMonitorScheduler(
+ MonitorSchedulerConfig config,
+ ServiceEmitter emitter,
+ List<Monitor> monitors,
+ CronScheduler monitorScheduler,
+ ExecutorService monitorRunner
+ )
+ {
+ super(config, emitter, monitors);
+ this.monitorScheduler = monitorScheduler;
+ this.monitorRunner = monitorRunner;
+ }
+
+ @Override
+ void startMonitor(final Monitor monitor)
+ {
+ monitor.start();
+ long rate = getConfig().getEmitterPeriod().getMillis();
+ final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
+ Future<?> future = monitorScheduler.scheduleAtFixedRate(
+ rate,
+ rate,
+ TimeUnit.MILLISECONDS,
+ new CronTask()
+ {
+ private Future<?> cancellationFuture = null;
+ private Future<Boolean> monitorFuture = null;
+
+ @Override
+ public void run(long scheduledRunTimeMillis)
+ {
+ waitForScheduleFutureToBeSet();
+ if (cancellationFuture == null) {
+ LOG.error("scheduleFuture is not set. Can't run monitor[%s]",
monitor.getClass().getName());
+ return;
+ }
+ try {
+ // Do nothing if the monitor is still running.
+ if (monitorFuture == null || monitorFuture.isDone()) {
+ if (monitorFuture != null) {
+ // monitorFuture must be done at this moment if it's not null
+ if (!(monitorFuture.get() && hasMonitor(monitor))) {
+ stopMonitor(monitor);
+ return;
+ }
+ }
+
+ LOG.trace("Running monitor[%s]", monitor.getClass().getName());
+ monitorFuture = monitorRunner.submit(() -> {
+ try {
+ return monitor.monitor(getEmitter());
+ }
+ catch (Throwable e) {
+ LOG.error(
+ e,
+ "Exception while executing monitor[%s]. Rescheduling
in %s ms",
+ monitor.getClass().getName(),
+ rate
+ );
+ return Boolean.TRUE;
+ }
+ });
+ }
+ }
+ catch (Throwable e) {
+ LOG.error(e, "Uncaught exception.");
+ }
+ }
+
+ private void waitForScheduleFutureToBeSet()
+ {
+ if (cancellationFuture == null) {
+ while (!Thread.currentThread().isInterrupted()) {
+ if (futureReference.get() != null) {
+ cancellationFuture = futureReference.get();
+ break;
+ }
+ }
+ }
+ }
+
+ private void stopMonitor(Monitor monitor)
+ {
+ removeMonitor(monitor);
+ cancellationFuture.cancel(false);
+ LOG.debug("Stopped monitor[%s]", monitor.getClass().getName());
+ }
+ }
+ );
+ futureReference.set(future);
+ }
+}
diff --git
a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
index 6649312..676c77a 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
@@ -24,14 +24,11 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.Future;
public abstract class CompoundMonitor implements Monitor
{
private final List<Monitor> monitors;
- private volatile Future<?> scheduledFuture;
-
public CompoundMonitor(List<Monitor> monitors)
{
this.monitors = monitors;
@@ -64,17 +61,5 @@ public abstract class CompoundMonitor implements Monitor
return shouldReschedule(Lists.transform(monitors, monitor ->
monitor.monitor(emitter)));
}
- @Override
- public Future<?> getScheduledFuture()
- {
- return scheduledFuture;
- }
-
- @Override
- public void setScheduledFuture(Future<?> scheduledFuture)
- {
- this.scheduledFuture = scheduledFuture;
- }
-
public abstract boolean shouldReschedule(List<Boolean> reschedules);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
index 8a3975e..8ddb3fa 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
@@ -21,8 +21,6 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import java.util.concurrent.Future;
-
/**
*/
@@ -38,8 +36,4 @@ public interface Monitor
* @return true if this monitor needs to continue monitoring. False
otherwise.
*/
boolean monitor(ServiceEmitter emitter);
-
- Future<?> getScheduledFuture();
-
- void setScheduledFuture(Future<?> scheduledFuture);
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 961f823..171ca40 100644
---
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -20,52 +20,37 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.Sets;
-import io.timeandspace.cronscheduler.CronScheduler;
-import io.timeandspace.cronscheduler.CronTask;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
/**
*/
-public class MonitorScheduler
+public abstract class MonitorScheduler
{
-
- private static final Logger log = new Logger(MonitorScheduler.class);
-
private final MonitorSchedulerConfig config;
private final ServiceEmitter emitter;
private final Set<Monitor> monitors;
private final Object lock = new Object();
- private final CronScheduler scheduler;
- private final ExecutorService executorService;
private volatile boolean started = false;
- public MonitorScheduler(
+ MonitorScheduler(
MonitorSchedulerConfig config,
- CronScheduler scheduler,
ServiceEmitter emitter,
- List<Monitor> monitors,
- ExecutorService executorService
+ List<Monitor> monitors
)
{
this.config = config;
- this.scheduler = scheduler;
this.emitter = emitter;
this.monitors = Sets.newHashSet(monitors);
- this.executorService = executorService;
}
@LifecycleStart
@@ -131,59 +116,23 @@ public class MonitorScheduler
}
}
- private void startMonitor(final Monitor monitor)
+ boolean hasMonitor(final Monitor monitor)
{
synchronized (lock) {
- monitor.start();
- long rate = config.getEmitterPeriod().getMillis();
- Future<?> scheduledFuture = scheduler.scheduleAtFixedRate(
- rate,
- rate,
- TimeUnit.MILLISECONDS,
- new CronTask()
- {
- private volatile Future<Boolean> monitorFuture = null;
- @Override
- public void run(long scheduledRunTimeMillis)
- {
- try {
- if (monitorFuture != null && monitorFuture.isDone()
- && !(monitorFuture.get() && hasMonitor(monitor))) {
- removeMonitor(monitor);
- monitor.getScheduledFuture().cancel(false);
- log.debug("Stopped rescheduling %s (delay %s)", this, rate);
- return;
- }
- log.trace("Running %s (period %s)", this, rate);
- monitorFuture = executorService.submit(new Callable<Boolean>()
- {
- @Override
- public Boolean call()
- {
- try {
- return monitor.monitor(emitter);
- }
- catch (Throwable e) {
- log.error(e, "Uncaught exception.");
- return Boolean.FALSE;
- }
- }
- });
- }
- catch (Throwable e) {
- log.error(e, "Uncaught exception.");
- }
- }
- });
- monitor.setScheduledFuture(scheduledFuture);
+ return monitors.contains(monitor);
}
}
- private boolean hasMonitor(final Monitor monitor)
+ MonitorSchedulerConfig getConfig()
{
- synchronized (lock) {
- return monitors.contains(monitor);
- }
+ return config;
}
-
+
+ ServiceEmitter getEmitter()
+ {
+ return emitter;
+ }
+
+ @GuardedBy("lock")
+ abstract void startMonitor(Monitor monitor);
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
b/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
new file mode 100644
index 0000000..ea04080
--- /dev/null
+++
b/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+public class BasicMonitorSchedulerTest
+{
+ private final MonitorSchedulerConfig config = new MonitorSchedulerConfig()
+ {
+ @Override
+ public Duration getEmitterPeriod()
+ {
+ return Duration.millis(5);
+ }
+ };
+ private ServiceEmitter emitter;
+ private ScheduledExecutorService exec;
+
+ @Before
+ public void setup()
+ {
+ emitter = Mockito.mock(ServiceEmitter.class);
+ exec = Execs.scheduledSingleThreaded("BasicMonitorSchedulerTest");
+ }
+
+ @Test
+ public void testStart_RepeatScheduling() throws InterruptedException
+ {
+ final Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.monitor(ArgumentMatchers.any())).thenReturn(true);
+
+ final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
+ config,
+ emitter,
+ ImmutableList.of(monitor),
+ exec
+ );
+ scheduler.start();
+ Thread.sleep(100);
+ Mockito.verify(monitor,
Mockito.atLeast(2)).monitor(ArgumentMatchers.any());
+ scheduler.stop();
+ }
+
+ @Test
+ public void testStart_RepeatAndStopScheduling() throws InterruptedException
+ {
+ final Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.monitor(ArgumentMatchers.any())).thenReturn(true,
true, true, false);
+
+ final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
+ config,
+ emitter,
+ ImmutableList.of(monitor),
+ exec
+ );
+ scheduler.start();
+ Thread.sleep(100);
+ // monitor.monitor() is called 5 times since a new task is scheduled first
and then the current one is executed.
+ // See ScheduledExecutors.scheduleAtFixedRate() for details.
+ Mockito.verify(monitor, Mockito.times(5)).monitor(ArgumentMatchers.any());
+ scheduler.stop();
+ }
+
+ @Test
+ public void testStart_UnexpectedExceptionWhileMonitoring_ContinueMonitor()
throws InterruptedException
+ {
+ final Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.monitor(ArgumentMatchers.any()))
+ .thenThrow(new RuntimeException("Test throwing exception while
monitoring"));
+
+ final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
+ config,
+ emitter,
+ ImmutableList.of(monitor),
+ exec
+ );
+ scheduler.start();
+ Thread.sleep(100);
+ // monitor.monitor() is called 5 times since a new task is scheduled first
and then the current one is executed.
+ // See ScheduledExecutors.scheduleAtFixedRate() for details.
+ Mockito.verify(monitor,
Mockito.atLeast(2)).monitor(ArgumentMatchers.any());
+ scheduler.stop();
+ }
+}
diff --git
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
b/core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
similarity index 73%
rename from
core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
rename to
core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
index da2ba59..b5e80a2 100644
---
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
+++
b/core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
@@ -23,7 +23,9 @@ package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableList;
import io.timeandspace.cronscheduler.CronScheduler;
import io.timeandspace.cronscheduler.CronTask;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -38,21 +40,37 @@ import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-public class MonitorSchedulerTest
+public class ClockDriftSafeMonitorSchedulerTest
{
-
+ // A real executor service to execute CronTask asynchronously.
+ // Many tests in this class use mocks to easily control the behavior of
CronScheduler and the ExecutorService
+ // used by MonitorScheduler. However, as MonitorScheduler uses two differnt
threads in production, one for
+ // scheduling a task to schedule a monitor (CronScheduler), and another for
running a scheduled monitor
+ // asynchronously, these tests also require to run some tasks in an
asynchronous manner. As mocks are convenient
+ // enough to control the behavior of things, we use another executorService
only to run some tasks asynchronously
+ // to mimic the nature of asynchronous execution in MonitorScheduler.
+ private ExecutorService cronTaskRunner;
@Mock
private CronScheduler cronScheduler;
@Before
public void setUp()
{
+ cronTaskRunner = Execs.singleThreaded("monitor-scheduler-test");
MockitoAnnotations.initMocks(this);
}
+
+ @After
+ public void tearDown()
+ {
+ cronTaskRunner.shutdownNow();
+ }
@Test
public void testFindMonitor()
@@ -72,11 +90,11 @@ public class MonitorSchedulerTest
ExecutorService executor = Mockito.mock(ExecutorService.class);
- final MonitorScheduler scheduler = new MonitorScheduler(
+ final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
Mockito.mock(MonitorSchedulerConfig.class),
-
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor1, monitor2),
+
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
executor
);
@@ -91,17 +109,18 @@ public class MonitorSchedulerTest
}
@Test
- public void testStart_RepeatScheduling()
+ public void testStart_RepeatScheduling() throws InterruptedException
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
+ CountDownLatch latch = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Future<?>>()
{
private int scheduleCount = 0;
@SuppressWarnings("unchecked")
@Override
- public Future<?> answer(InvocationOnMock invocation) throws Exception
+ public Future<?> answer(InvocationOnMock invocation)
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
@@ -117,10 +136,14 @@ public class MonitorSchedulerTest
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
- while (scheduleCount < 2) {
- scheduleCount++;
- task.run(123L);
- }
+ cronTaskRunner.submit(() -> {
+ while (scheduleCount < 2) {
+ scheduleCount++;
+ task.run(123L);
+ }
+ latch.countDown();
+ return null;
+ });
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
@@ -131,13 +154,15 @@ public class MonitorSchedulerTest
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new
org.joda.time.Duration(1000L));
- final MonitorScheduler scheduler = new MonitorScheduler(
+ final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
- cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
- executor);
+ cronScheduler,
+ executor
+ );
scheduler.start();
+ latch.await(5, TimeUnit.SECONDS);
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler,
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
@@ -145,21 +170,22 @@ public class MonitorSchedulerTest
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor,
Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
-
+ scheduler.stop();
}
@Test
- public void testStart_RepeatAndStopScheduling()
+ public void testStart_RepeatAndStopScheduling() throws InterruptedException
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
+ CountDownLatch latch = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Future<?>>()
{
private int scheduleCount = 0;
@SuppressWarnings("unchecked")
@Override
- public Future<?> answer(InvocationOnMock invocation) throws Exception
+ public Future<?> answer(InvocationOnMock invocation)
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
@@ -174,29 +200,34 @@ public class MonitorSchedulerTest
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
- while (scheduleCount < 2) {
- scheduleCount++;
- task.run(123L);
- }
+ cronTaskRunner.submit(() -> {
+ while (scheduleCount < 2) {
+ scheduleCount++;
+ task.run(123L);
+ }
+ latch.countDown();
+ return null;
+ });
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Monitor monitor = Mockito.mock(Monitor.class);
- Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new
org.joda.time.Duration(1000L));
- final MonitorScheduler scheduler = new MonitorScheduler(
+ final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
- cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
- executor);
+ cronScheduler,
+ executor
+ );
scheduler.start();
-
+ latch.await(5, TimeUnit.SECONDS);
+
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler,
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
@@ -204,26 +235,27 @@ public class MonitorSchedulerTest
Mockito.verify(executor,
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
Mockito.verify(monitor, Mockito.times(1)).stop();
-
+ scheduler.stop();
}
@Test
- public void testStart_UnexpectedExceptionWhileMonitoring()
+ public void testStart_UnexpectedExceptionWhileMonitoring() throws
InterruptedException
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Monitor monitor = Mockito.mock(Monitor.class);
- Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
-
Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new
RuntimeException());
+ Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class)))
+ .thenThrow(new RuntimeException("Test throwing exception while
monitoring"));
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new
org.joda.time.Duration(1000L));
-
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean monitorResultHolder = new AtomicBoolean(false);
Mockito.doAnswer(new Answer<Future<?>>()
{
@SuppressWarnings("unchecked")
@Override
- public Future<?> answer(InvocationOnMock invocation) throws Exception
+ public Future<?> answer(InvocationOnMock invocation)
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
@@ -234,78 +266,92 @@ public class MonitorSchedulerTest
public Future<Boolean> answer(InvocationOnMock invocation) throws
Exception
{
final Object originalArgument = (invocation.getArguments())[0];
- ((Callable<Boolean>) originalArgument).call();
- return CompletableFuture.completedFuture(Boolean.TRUE);
+ final boolean continueMonitor = ((Callable<Boolean>)
originalArgument).call();
+ monitorResultHolder.set(continueMonitor);
+ return CompletableFuture.completedFuture(continueMonitor);
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
- task.run(123L);
+ cronTaskRunner.submit(() -> {
+ task.run(123L);
+ latch.countDown();
+ return null;
+ });
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
- final MonitorScheduler scheduler = new MonitorScheduler(
+ final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
- cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
- executor);
+ cronScheduler,
+ executor
+ );
scheduler.start();
-
+ latch.await(5, TimeUnit.SECONDS);
+
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler,
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor,
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+ Assert.assertTrue(monitorResultHolder.get());
+ scheduler.stop();
}
-
@Test
- public void testStart_UnexpectedExceptionWhileScheduling()
+ public void testStart_UnexpectedExceptionWhileScheduling() throws
InterruptedException
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Monitor monitor = Mockito.mock(Monitor.class);
- Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new
org.joda.time.Duration(1000L));
-
+ CountDownLatch latch = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Future<?>>()
{
@SuppressWarnings("unchecked")
@Override
- public Future<?> answer(InvocationOnMock invocation) throws Exception
+ public Future<?> answer(InvocationOnMock invocation)
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
-
Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new
RuntimeException());
- task.run(123L);
+ Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class)))
+ .thenThrow(new RuntimeException("Test throwing exception while
scheduling"));
+ cronTaskRunner.submit(() -> {
+ task.run(123L);
+ latch.countDown();
+ return null;
+ });
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
- final MonitorScheduler scheduler = new MonitorScheduler(
+ final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
- cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
- executor);
+ cronScheduler,
+ executor
+ );
scheduler.start();
-
+ latch.await(5, TimeUnit.SECONDS);
+
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler,
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor,
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ scheduler.stop();
}
-
private Future createDummyFuture()
{
Future<?> future = new Future()
@@ -366,19 +412,5 @@ public class MonitorSchedulerTest
{
return true;
}
-
- @Override
- public Future<?> getScheduledFuture()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void setScheduledFuture(Future<?> scheduledFuture)
- {
- // TODO Auto-generated method stub
-
- }
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
b/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
index 228360f..0e242b1 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
import org.apache.druid.java.util.metrics.MonitorSchedulerConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -29,8 +30,16 @@ import org.joda.time.Period;
public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig
{
@JsonProperty
+ private String schedulerClassName = BasicMonitorScheduler.class.getName();
+
+ @JsonProperty
private Period emissionPeriod = new Period("PT1M");
+ public String getSchedulerClassName()
+ {
+ return schedulerClassName;
+ }
+
@JsonProperty
public Period getEmissionPeriod()
{
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index d7598ae..df6fe8b 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -32,9 +32,12 @@ import org.apache.druid.guice.DruidBinders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
+import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
import org.apache.druid.java.util.metrics.JvmCpuMonitor;
import org.apache.druid.java.util.metrics.JvmMonitor;
import org.apache.druid.java.util.metrics.JvmThreadsMonitor;
@@ -56,6 +59,7 @@ import java.util.stream.Collectors;
*/
public class MetricsModule implements Module
{
+ static final String MONITORING_PROPERTY_PREFIX = "druid.monitoring";
private static final Logger log = new Logger(MetricsModule.class);
public static void register(Binder binder, Class<? extends Monitor>
monitorClazz)
@@ -66,8 +70,8 @@ public class MetricsModule implements Module
@Override
public void configure(Binder binder)
{
- JsonConfigProvider.bind(binder, "druid.monitoring",
DruidMonitorSchedulerConfig.class);
- JsonConfigProvider.bind(binder, "druid.monitoring", MonitorsConfig.class);
+ JsonConfigProvider.bind(binder, MONITORING_PROPERTY_PREFIX,
DruidMonitorSchedulerConfig.class);
+ JsonConfigProvider.bind(binder, MONITORING_PROPERTY_PREFIX,
MonitorsConfig.class);
DruidBinders.metricMonitorBinder(binder); // get the binder so that it
will inject the empty set at a minimum.
@@ -106,13 +110,24 @@ public class MetricsModule implements Module
);
}
- return new MonitorScheduler(
- config.get(),
-
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
- emitter,
- monitors,
- Execs.multiThreaded(64, "MonitorThread-%d")
- );
+ if
(ClockDriftSafeMonitorScheduler.class.getName().equals(config.get().getSchedulerClassName()))
{
+ return new ClockDriftSafeMonitorScheduler(
+ config.get(),
+ emitter,
+ monitors,
+
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler").build(),
+ Execs.singleThreaded("MonitorRunner")
+ );
+ } else if
(BasicMonitorScheduler.class.getName().equals(config.get().getSchedulerClassName()))
{
+ return new BasicMonitorScheduler(
+ config.get(),
+ emitter,
+ monitors,
+ Execs.scheduledSingleThreaded("MonitorScheduler-%s")
+ );
+ } else {
+ throw new IAE("Unknown monitor scheduler[%s]",
config.get().getSchedulerClassName());
+ }
}
@Provides
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
index a94d6cc..cba4f84 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
@@ -21,20 +21,41 @@ package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
+import com.google.inject.CreationException;
+import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
+import com.google.inject.Scopes;
import com.google.inject.name.Names;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
+import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
+import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.server.DruidNode;
+import org.hamcrest.CoreMatchers;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import java.util.Properties;
public class MetricsModuleTest
{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void testSimpleInjection()
{
@@ -88,4 +109,64 @@ public class MetricsModuleTest
Assert.assertEquals(dataSource, dimensionIdHolder.getDataSource());
Assert.assertEquals(taskId, dimensionIdHolder.getTaskId());
}
+
+ @Test
+ public void testGetBasicMonitorSchedulerByDefault()
+ {
+ final MonitorScheduler monitorScheduler = createInjector(new
Properties()).getInstance(MonitorScheduler.class);
+ Assert.assertSame(BasicMonitorScheduler.class,
monitorScheduler.getClass());
+ }
+
+ @Test
+ public void testGetClockDriftSafeMonitorSchedulerViaConfig()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty(
+ StringUtils.format("%s.schedulerClassName",
MetricsModule.MONITORING_PROPERTY_PREFIX),
+ ClockDriftSafeMonitorScheduler.class.getName()
+ );
+ final MonitorScheduler monitorScheduler =
createInjector(properties).getInstance(MonitorScheduler.class);
+ Assert.assertSame(ClockDriftSafeMonitorScheduler.class,
monitorScheduler.getClass());
+ }
+
+ @Test
+ public void testGetBasicMonitorSchedulerViaConfig()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty(
+ StringUtils.format("%s.schedulerClassName",
MetricsModule.MONITORING_PROPERTY_PREFIX),
+ BasicMonitorScheduler.class.getName()
+ );
+ final MonitorScheduler monitorScheduler =
createInjector(properties).getInstance(MonitorScheduler.class);
+ Assert.assertSame(BasicMonitorScheduler.class,
monitorScheduler.getClass());
+ }
+
+ @Test
+ public void testGetMonitorSchedulerUnknownSchedulerException()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty(
+ StringUtils.format("%s.schedulerClassName",
MetricsModule.MONITORING_PROPERTY_PREFIX),
+ "UnknownScheduler"
+ );
+ expectedException.expect(CreationException.class);
+
expectedException.expectCause(CoreMatchers.instanceOf(IllegalArgumentException.class));
+ expectedException.expectMessage("Unknown monitor
scheduler[UnknownScheduler]");
+ createInjector(properties).getInstance(MonitorScheduler.class);
+ }
+
+ private static Injector createInjector(Properties properties)
+ {
+ return Guice.createInjector(
+ new JacksonModule(),
+ new LifecycleModule(),
+ binder -> {
+
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+ binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+ binder.bind(ServiceEmitter.class).toInstance(new
NoopServiceEmitter());
+ binder.bind(Properties.class).toInstance(properties);
+ },
+ new MetricsModule()
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]