This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 770ad95169 Add a metric for task duration in the pending queue (#12492)
770ad95169 is described below
commit 770ad951693f30e6b56af4db430f88f0408d10d5
Author: Rocky Chen <[email protected]>
AuthorDate: Mon May 2 20:47:25 2022 -0700
Add a metric for task duration in the pending queue (#12492)
This PR is to measure how long a task stays in the pending queue and emits
the value with the metric task/pending/time. The metric is measured in
RemoteTaskRunner and HttpRemoteTaskRunner.
An example of the metric:
```
2022-04-26T21:59:09,488 INFO [rtr-pending-tasks-runner-0]
org.apache.druid.java.util.emitter.core.LoggingEmitter -
{"feed":"metrics","timestamp":"2022-04-26T21:59:09.487Z","service":"druid/coordinator","host":"localhost:8081","version":"2022.02.0-iap-SNAPSHOT","metric":"task/pending/time","value":8,"dataSource":"wikipedia","taskId":"index_parallel_wikipedia_gecpcglg_2022-04-26T21:59:09.432Z","taskType":"index_parallel"}
```
------------------------------------------
Key changed/added classes in this PR
Emit metric task/pending/time in classes RemoteTaskRunner and
HttpRemoteTaskRunner.
Update related factory classes and tests.
---
docs/operations/metrics.md | 1 +
.../druid/indexing/overlord/RemoteTaskRunner.java | 21 ++++++-
.../indexing/overlord/RemoteTaskRunnerFactory.java | 9 ++-
.../overlord/hrtr/HttpRemoteTaskRunner.java | 17 ++++-
.../overlord/hrtr/HttpRemoteTaskRunnerFactory.java | 9 ++-
.../overlord/RemoteTaskRunnerFactoryTest.java | 72 ++++++++++++++++++++++
.../overlord/RemoteTaskRunnerTestUtils.java | 4 +-
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 30 ++++++---
8 files changed, 146 insertions(+), 17 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8124acd1c9..dd442bfbf8 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -198,6 +198,7 @@ Note: If the JVM does not support CPU time measurement for
the current thread, i
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`task/run/time`|Milliseconds taken to run a task.|dataSource, taskId,
taskType, taskStatus.|Varies.|
+|`task/pending/time`|Milliseconds taken for a task to wait for
running.|dataSource, taskId, taskType.|Varies.|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit
log.|dataSource, taskId, taskType|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task
action.|dataSource, taskId, taskType|Varies from subsecond to a few seconds,
based on action type.|
|`segment/added/bytes`|Size in bytes of new segments created.|dataSource,
taskId, taskType, interval.|Varies.|
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 88a1c2b426..cbe213cd47 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -54,6 +54,7 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
@@ -74,6 +75,8 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -179,6 +182,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups =
new ConcurrentHashMap<>();
private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
+ private final ServiceEmitter emitter;
private ProvisioningService provisioningService;
public RemoteTaskRunner(
@@ -189,7 +193,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
- ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
+ ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
+ ServiceEmitter emitter
)
{
this.jsonMapper = jsonMapper;
@@ -213,6 +218,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
config.getPendingTasksRunnerNumThreads(),
"rtr-pending-tasks-runner-%d"
);
+ this.emitter = emitter;
}
@Override
@@ -934,6 +940,13 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
return false;
}
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+ emitter.emit(metricBuilder.build(
+ "task/pending/time",
+ new Duration(workItem.getQueueInsertionTime(),
DateTimes.nowUtc()).getMillis())
+ );
+
RemoteTaskRunnerWorkItem newWorkItem =
workItem.withWorker(theZkWorker.getWorker(), null);
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])",
task.getId(), newWorkItem.getWorker().getHost());
@@ -1516,6 +1529,12 @@ public class RemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
return workersWithUnacknowledgedTask;
}
+ @VisibleForTesting
+ ProvisioningStrategy<WorkerTaskRunner> getProvisioningStrategy()
+ {
+ return provisioningStrategy;
+ }
+
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerFactory.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerFactory.java
index 2efd8a1cfe..3e47eba443 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerFactory.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerFactory.java
@@ -30,6 +30,7 @@ import
org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfi
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
@@ -46,6 +47,7 @@ public class RemoteTaskRunnerFactory implements
TaskRunnerFactory<RemoteTaskRunn
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final ProvisioningStrategy provisioningStrategy;
+ private final ServiceEmitter emitter;
@Inject
public RemoteTaskRunnerFactory(
@@ -56,7 +58,8 @@ public class RemoteTaskRunnerFactory implements
TaskRunnerFactory<RemoteTaskRunn
@EscalatedGlobal final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
final ProvisioningSchedulerConfig provisioningSchedulerConfig,
- final ProvisioningStrategy provisioningStrategy
+ final ProvisioningStrategy provisioningStrategy,
+ final ServiceEmitter emitter
)
{
this.curator = curator;
@@ -67,6 +70,7 @@ public class RemoteTaskRunnerFactory implements
TaskRunnerFactory<RemoteTaskRunn
this.workerConfigRef = workerConfigRef;
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.provisioningStrategy = provisioningStrategy;
+ this.emitter = emitter;
}
@Override
@@ -80,7 +84,8 @@ public class RemoteTaskRunnerFactory implements
TaskRunnerFactory<RemoteTaskRunn
new PathChildrenCacheFactory.Builder().withCompressed(true),
httpClient,
workerConfigRef,
- provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy :
new NoopProvisioningStrategy<>()
+ provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy :
new NoopProvisioningStrategy<>(),
+ emitter
);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 8aacc26624..fcc7352c85 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
@@ -76,6 +77,8 @@ import
org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -83,6 +86,7 @@ import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
@@ -182,6 +186,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
private final HttpRemoteTaskRunnerConfig config;
private final TaskStorage taskStorage;
+ private final ServiceEmitter emitter;
// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and
WorkerTaskMonitor are removed.
private static final Joiner JOINER = Joiner.on("/");
@@ -203,7 +208,8 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
TaskStorage taskStorage,
@Nullable CuratorFramework cf,
- IndexerZkConfig indexerZkConfig
+ IndexerZkConfig indexerZkConfig,
+ ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
@@ -212,6 +218,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.workerConfigRef = workerConfigRef;
+ this.emitter = emitter;
this.pendingTasksExec = Execs.multiThreaded(
config.getPendingTasksRunnerNumThreads(),
@@ -1548,6 +1555,14 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
taskItem.setWorker(worker);
taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
log.info("Task[%s] started RUNNING on worker[%s].", taskId,
worker.getHost());
+
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder,
taskItem.getTask());
+ emitter.emit(metricBuilder.build(
+ "task/pending/time",
+ new Duration(taskItem.getCreatedTime(),
DateTimes.nowUtc()).getMillis())
+ );
+
// fall through
case RUNNING:
if (worker.getHost().equals(taskItem.getWorker().getHost())) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java
index 6b81bb0782..5b78f937eb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java
@@ -35,6 +35,7 @@ import
org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfi
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
@@ -54,6 +55,7 @@ public class HttpRemoteTaskRunnerFactory implements
TaskRunnerFactory<HttpRemote
private final ProvisioningStrategy provisioningStrategy;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final TaskStorage taskStorage;
+ private final ServiceEmitter emitter;
// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and
WorkerTaskMonitor are removed.
@Nullable //Null if zk is disabled
@@ -72,7 +74,8 @@ public class HttpRemoteTaskRunnerFactory implements
TaskRunnerFactory<HttpRemote
final TaskStorage taskStorage,
final Provider<CuratorFramework> cfProvider,
final IndexerZkConfig indexerZkConfig,
- final ZkEnablementConfig zkEnablementConfig
+ final ZkEnablementConfig zkEnablementConfig,
+ final ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
@@ -84,6 +87,7 @@ public class HttpRemoteTaskRunnerFactory implements
TaskRunnerFactory<HttpRemote
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.indexerZkConfig = indexerZkConfig;
+ this.emitter = emitter;
if (zkEnablementConfig.isEnabled()) {
this.cf = cfProvider.get();
@@ -104,7 +108,8 @@ public class HttpRemoteTaskRunnerFactory implements
TaskRunnerFactory<HttpRemote
druidNodeDiscoveryProvider,
taskStorage,
cf,
- indexerZkConfig
+ indexerZkConfig,
+ emitter
);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java
new file mode 100644
index 0000000000..f62c86b14c
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
+import
org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
+import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import org.apache.druid.server.initialization.IndexerZkConfig;
+import org.apache.druid.server.initialization.ZkPathsConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RemoteTaskRunnerFactoryTest
+{
+ @Test
+ public void testBuildWithAutoScale()
+ {
+ ProvisioningSchedulerConfig provisioningSchedulerConfig =
Mockito.mock(ProvisioningSchedulerConfig.class);
+ Mockito.when(provisioningSchedulerConfig.isDoAutoscale()).thenReturn(true);
+
+ RemoteTaskRunnerFactory remoteTaskRunnerFactory =
getTestRemoteTaskRunnerFactory(provisioningSchedulerConfig);
+
+
Assert.assertNull(remoteTaskRunnerFactory.build().getProvisioningStrategy());
+ }
+
+ @Test
+ public void testBuildWithoutAutoScale()
+ {
+ ProvisioningSchedulerConfig provisioningSchedulerConfig =
Mockito.mock(ProvisioningSchedulerConfig.class);
+
Mockito.when(provisioningSchedulerConfig.isDoAutoscale()).thenReturn(false);
+
+ RemoteTaskRunnerFactory remoteTaskRunnerFactory =
getTestRemoteTaskRunnerFactory(provisioningSchedulerConfig);
+
+
Assert.assertTrue(remoteTaskRunnerFactory.build().getProvisioningStrategy()
instanceof NoopProvisioningStrategy);
+ }
+
+ private RemoteTaskRunnerFactory
getTestRemoteTaskRunnerFactory(ProvisioningSchedulerConfig
provisioningSchedulerConfig)
+ {
+ CuratorFramework curator = Mockito.mock(CuratorFramework.class);
+ Mockito.when(curator.newWatcherRemoveCuratorFramework()).thenReturn(null);
+ return new RemoteTaskRunnerFactory(
+ curator,
+ new RemoteTaskRunnerConfig(),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ null,
+ null,
+ null,
+ provisioningSchedulerConfig,
+ null,
+ null
+ );
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
index 69efc66032..bfbafd0cec 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
@@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.atomic.AtomicReference;
@@ -270,7 +271,8 @@ public class RemoteTaskRunnerTestUtils
pathChildrenCacheFactory,
httpClient,
workerConfigRef,
- provisioningStrategy
+ provisioningStrategy,
+ new NoopServiceEmitter()
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 336cda9a56..0c8faeb9ff 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -230,7 +230,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
@@ -298,7 +299,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
@@ -402,7 +404,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
taskStorageMock,
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
@@ -544,7 +547,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
@@ -719,7 +723,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
@@ -916,7 +921,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
@@ -1405,7 +1411,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
@@ -1517,7 +1524,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
@@ -1625,7 +1633,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
taskStorage,
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
);
taskRunner.start();
@@ -1896,7 +1905,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
- new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
)
{
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]