This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d0c2ede Added CronScheduler support as a proof to clock drift while
emitting metrics (#10448)
d0c2ede is described below
commit d0c2ede50c94bef89b72261fe3e553c60e7f0013
Author: Ayush Kulshrestha <[email protected]>
AuthorDate: Wed Nov 25 17:01:38 2020 +0530
Added CronScheduler support as a proof to clock drift while emitting
metrics (#10448)
Co-authored-by: Ayush Kulshrestha <[email protected]>
---
core/pom.xml | 4 +
.../druid/java/util/metrics/AbstractMonitor.java | 17 ++
.../druid/java/util/metrics/CompoundMonitor.java | 15 +
.../apache/druid/java/util/metrics/Monitor.java | 7 +
.../druid/java/util/metrics/MonitorScheduler.java | 75 +++--
.../java/util/metrics/MonitorSchedulerTest.java | 304 ++++++++++++++++++++-
licenses.yaml | 10 +
pom.xml | 5 +
server/pom.xml | 4 +
.../apache/druid/server/metrics/MetricsModule.java | 7 +-
10 files changed, 423 insertions(+), 25 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index e0ccfa1..72fea54 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -224,6 +224,10 @@
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.timeandspace</groupId>
+ <artifactId>cron-scheduler</artifactId>
+ </dependency>
diff --git
a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
index 029dd47..4fbefb8 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
@@ -22,11 +22,16 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import java.util.concurrent.Future;
+
+
/**
*/
public abstract class AbstractMonitor implements Monitor
{
private volatile boolean started = false;
+
+ private volatile Future<?> scheduledFuture;
@Override
public void start()
@@ -51,4 +56,16 @@ public abstract class AbstractMonitor implements Monitor
}
public abstract boolean doMonitor(ServiceEmitter emitter);
+
+ @Override
+ public Future<?> getScheduledFuture()
+ {
+ return scheduledFuture;
+ }
+
+ @Override
+ public void setScheduledFuture(Future<?> scheduledFuture)
+ {
+ this.scheduledFuture = scheduledFuture;
+ }
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
index 9811f58..6649312 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
@@ -24,10 +24,13 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Future;
public abstract class CompoundMonitor implements Monitor
{
private final List<Monitor> monitors;
+
+ private volatile Future<?> scheduledFuture;
public CompoundMonitor(List<Monitor> monitors)
{
@@ -61,5 +64,17 @@ public abstract class CompoundMonitor implements Monitor
return shouldReschedule(Lists.transform(monitors, monitor ->
monitor.monitor(emitter)));
}
+ @Override
+ public Future<?> getScheduledFuture()
+ {
+ return scheduledFuture;
+ }
+
+ @Override
+ public void setScheduledFuture(Future<?> scheduledFuture)
+ {
+ this.scheduledFuture = scheduledFuture;
+ }
+
public abstract boolean shouldReschedule(List<Boolean> reschedules);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
index 2ccd5db..8a3975e 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
@@ -21,6 +21,9 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import java.util.concurrent.Future;
+
+
/**
*/
public interface Monitor
@@ -35,4 +38,8 @@ public interface Monitor
* @return true if this monitor needs to continue monitoring. False
otherwise.
*/
boolean monitor(ServiceEmitter emitter);
+
+ Future<?> getScheduledFuture();
+
+ void setScheduledFuture(Future<?> scheduledFuture);
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 2adbe95..961f823 100644
---
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -20,41 +20,52 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.Sets;
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
/**
*/
public class MonitorScheduler
{
+
+ private static final Logger log = new Logger(MonitorScheduler.class);
+
private final MonitorSchedulerConfig config;
- private final ScheduledExecutorService exec;
private final ServiceEmitter emitter;
private final Set<Monitor> monitors;
private final Object lock = new Object();
+ private final CronScheduler scheduler;
+ private final ExecutorService executorService;
private volatile boolean started = false;
-
+
public MonitorScheduler(
MonitorSchedulerConfig config,
- ScheduledExecutorService exec,
+ CronScheduler scheduler,
ServiceEmitter emitter,
- List<Monitor> monitors
+ List<Monitor> monitors,
+ ExecutorService executorService
)
{
this.config = config;
- this.exec = exec;
+ this.scheduler = scheduler;
this.emitter = emitter;
this.monitors = Sets.newHashSet(monitors);
+ this.executorService = executorService;
}
@LifecycleStart
@@ -124,24 +135,47 @@ public class MonitorScheduler
{
synchronized (lock) {
monitor.start();
- ScheduledExecutors.scheduleAtFixedRate(
- exec,
- config.getEmitterPeriod(),
- new Callable<ScheduledExecutors.Signal>()
+ long rate = config.getEmitterPeriod().getMillis();
+ Future<?> scheduledFuture = scheduler.scheduleAtFixedRate(
+ rate,
+ rate,
+ TimeUnit.MILLISECONDS,
+ new CronTask()
{
+ private volatile Future<Boolean> monitorFuture = null;
@Override
- public ScheduledExecutors.Signal call()
+ public void run(long scheduledRunTimeMillis)
{
- // Run one more time even if the monitor was removed, in case
there's some extra data to flush
- if (monitor.monitor(emitter) && hasMonitor(monitor)) {
- return ScheduledExecutors.Signal.REPEAT;
- } else {
- removeMonitor(monitor);
- return ScheduledExecutors.Signal.STOP;
+ try {
+ if (monitorFuture != null && monitorFuture.isDone()
+ && !(monitorFuture.get() && hasMonitor(monitor))) {
+ removeMonitor(monitor);
+ monitor.getScheduledFuture().cancel(false);
+ log.debug("Stopped rescheduling %s (delay %s)", this, rate);
+ return;
+ }
+ log.trace("Running %s (period %s)", this, rate);
+ monitorFuture = executorService.submit(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call()
+ {
+ try {
+ return monitor.monitor(emitter);
+ }
+ catch (Throwable e) {
+ log.error(e, "Uncaught exception.");
+ return Boolean.FALSE;
+ }
+ }
+ });
+ }
+ catch (Throwable e) {
+ log.error(e, "Uncaught exception.");
}
}
- }
- );
+ });
+ monitor.setScheduledFuture(scheduledFuture);
}
}
@@ -151,4 +185,5 @@ public class MonitorScheduler
return monitors.contains(monitor);
}
}
+
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
index 7667196..da2ba59 100644
---
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
+++
b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
@@ -19,17 +19,41 @@
package org.apache.druid.java.util.metrics;
+
import com.google.common.collect.ImmutableList;
-import org.apache.druid.java.util.common.concurrent.Execs;
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.time.Duration;
import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
public class MonitorSchedulerTest
{
+
+ @Mock
+ private CronScheduler cronScheduler;
+
+ @Before
+ public void setUp()
+ {
+ MockitoAnnotations.initMocks(this);
+ }
+
@Test
public void testFindMonitor()
{
@@ -45,12 +69,15 @@ public class MonitorSchedulerTest
final Monitor1 monitor1 = new Monitor1();
final Monitor2 monitor2 = new Monitor2();
+
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
final MonitorScheduler scheduler = new MonitorScheduler(
Mockito.mock(MonitorSchedulerConfig.class),
- Execs.scheduledSingleThreaded("monitor-scheduler-test"),
+
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
Mockito.mock(ServiceEmitter.class),
- ImmutableList.of(monitor1, monitor2)
+ ImmutableList.of(monitor1, monitor2),
+ executor
);
final Optional<Monitor1> maybeFound1 =
scheduler.findMonitor(Monitor1.class);
@@ -62,7 +89,264 @@ public class MonitorSchedulerTest
Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
}
+
+ @Test
+ public void testStart_RepeatScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+
+ Mockito.doAnswer(new Answer<Future<?>>()
+ {
+ private int scheduleCount = 0;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future<?> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+ Mockito.doAnswer(new Answer<Future<?>>()
+ {
+ @Override
+ public Future<Boolean> answer(InvocationOnMock invocation) throws
Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable<Boolean>) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.TRUE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+ while (scheduleCount < 2) {
+ scheduleCount++;
+ task.run(123L);
+ }
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+ Monitor monitor = Mockito.mock(Monitor.class);
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new
org.joda.time.Duration(1000L));
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler,
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor,
Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
+
+ }
+
+ @Test
+ public void testStart_RepeatAndStopScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+
+ Mockito.doAnswer(new Answer<Future<?>>()
+ {
+ private int scheduleCount = 0;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future<?> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+ Mockito.doAnswer(new Answer<Future<?>>()
+ {
+ @Override
+ public Future<Boolean> answer(InvocationOnMock invocation) throws
Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable<Boolean>) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.FALSE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+ while (scheduleCount < 2) {
+ scheduleCount++;
+ task.run(123L);
+ }
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new
org.joda.time.Duration(1000L));
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler,
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor,
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+ Mockito.verify(monitor, Mockito.times(1)).stop();
+
+ }
+
+ @Test
+ public void testStart_UnexpectedExceptionWhileMonitoring()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+
Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new
RuntimeException());
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new
org.joda.time.Duration(1000L));
+
+
+ Mockito.doAnswer(new Answer<Future<?>>()
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future<?> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+ Mockito.doAnswer(new Answer<Future<?>>()
+ {
+ @Override
+ public Future<Boolean> answer(InvocationOnMock invocation) throws
Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable<Boolean>) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.TRUE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+ task.run(123L);
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler,
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor,
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+ }
+
+
+ @Test
+ public void testStart_UnexpectedExceptionWhileScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new
org.joda.time.Duration(1000L));
+
+
+ Mockito.doAnswer(new Answer<Future<?>>()
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future<?> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+
Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new
RuntimeException());
+ task.run(123L);
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler,
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor,
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ }
+
+
+ private Future createDummyFuture()
+ {
+ Future<?> future = new Future()
+ {
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return false;
+ }
+
+ @Override
+ public Object get()
+ {
+ return null;
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit)
+ {
+ return null;
+ }
+
+ };
+
+ return future;
+ }
+
+
private static class NoopMonitor implements Monitor
{
@Override
@@ -82,5 +366,19 @@ public class MonitorSchedulerTest
{
return true;
}
+
+ @Override
+ public Future<?> getScheduledFuture()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setScheduledFuture(Future<?> scheduledFuture)
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
diff --git a/licenses.yaml b/licenses.yaml
index 170b3d1..e684073 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -345,6 +345,16 @@ libraries:
---
+name: CronScheduler
+license_category: binary
+module: java-core
+license_name: Apache License version 2.0
+version: 0.1
+libraries:
+ - io.timeandspace: cron-scheduler
+
+---
+
name: LMAX Disruptor
license_category: binary
module: java-core
diff --git a/pom.xml b/pom.xml
index 383de94..ffe9460 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1255,6 +1255,11 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.timeandspace</groupId>
+ <artifactId>cron-scheduler</artifactId>
+ <version>0.1</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/server/pom.xml b/server/pom.xml
index 7fa3a48..caddf48 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -311,6 +311,10 @@
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.timeandspace</groupId>
+ <artifactId>cron-scheduler</artifactId>
+ </dependency>
<!-- Tests -->
<dependency>
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index 71b9945..d7598ae 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -27,6 +27,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
+import io.timeandspace.cronscheduler.CronScheduler;
import org.apache.druid.guice.DruidBinders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@@ -42,6 +43,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.java.util.metrics.SysMonitor;
import org.apache.druid.query.ExecutorServiceMonitor;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -106,9 +108,10 @@ public class MetricsModule implements Module
return new MonitorScheduler(
config.get(),
- Execs.scheduledSingleThreaded("MonitorScheduler-%s"),
+
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
emitter,
- monitors
+ monitors,
+ Execs.multiThreaded(64, "MonitorThread-%d")
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]