kfaraz commented on code in PR #15991:
URL: https://github.com/apache/druid/pull/15991#discussion_r1505486575


##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManagerMonitor.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+public class WorkerTaskManagerMonitor extends AbstractMonitor
+{
+  private final WorkerTaskManager workerTaskManager;
+  private static final String WORKER_RUNNING_TASK_COUNT_METRICS = 
"worker/task/running/count";
+  private static final String WORKER_ASSIGNED_TASK_COUNT_METRIC = 
"worker/task/assigned/count";
+  private static final String WORKER_COMPLETED_TASK_COUNT_METRIC = 
"worker/task/completed/count";
+
+  @Inject
+  public WorkerTaskManagerMonitor(WorkerTaskManager workerTaskManager)
+  {
+    this.workerTaskManager = workerTaskManager;
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    final Map<String, Integer> runningTasks, assignedTasks, completedTasks;
+
+    runningTasks = getDataSourceTasks(workerTaskManager.getRunningTasks(), 
WorkerTaskManager.TaskDetails::getDataSource);
+    assignedTasks = getDataSourceTasks(workerTaskManager.getAssignedTasks(), 
Task::getDataSource);
+    completedTasks = getDataSourceTasks(workerTaskManager.getCompletedTasks(), 
TaskAnnouncement::getTaskDataSource);

Review Comment:
   Preferred style in Druid (if not Java itself):
   ```suggestion
       final Map<String, Integer> runningTasks = 
getDataSourceTasks(workerTaskManager.getRunningTasks(), 
WorkerTaskManager.TaskDetails::getDataSource);
       final Map<String, Integer> assignedTasks = 
getDataSourceTasks(workerTaskManager.getAssignedTasks(), Task::getDataSource);
       final Map<String, Integer> completedTasks = 
getDataSourceTasks(workerTaskManager.getCompletedTasks(), 
TaskAnnouncement::getTaskDataSource);
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerMonitorTest.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.createMock;
+
+public class WorkerTaskManagerMonitorTest
+{
+  private WorkerTaskManager workerTaskManager;
+  private TaskRunner taskRunner;
+  private ObjectMapper jsonMapper;
+  private TaskConfig taskConfig;
+  private OverlordClient overlordClient;
+  private Task task;
+  private WorkerTaskManager.TaskDetails taskDetails;
+  private TaskAnnouncement taskAnnouncement;
+
+  @Before
+  public void setUp()
+  {
+    task = createMock(Task.class);

Review Comment:
   Use `NoopTask` instead of creating a mock.



##########
indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerMonitorTest.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.createMock;
+
+public class WorkerTaskManagerMonitorTest
+{
+  private WorkerTaskManager workerTaskManager;
+  private TaskRunner taskRunner;
+  private ObjectMapper jsonMapper;
+  private TaskConfig taskConfig;
+  private OverlordClient overlordClient;
+  private Task task;
+  private WorkerTaskManager.TaskDetails taskDetails;
+  private TaskAnnouncement taskAnnouncement;
+
+  @Before
+  public void setUp()
+  {
+    task = createMock(Task.class);
+    EasyMock.expect(task.getDataSource()).andReturn("dummy_DS1");
+    EasyMock.replay(task);
+    taskDetails = createMock(WorkerTaskManager.TaskDetails.class);
+    EasyMock.expect(taskDetails.getDataSource()).andReturn("dummy_DS2");
+    EasyMock.replay(taskDetails);
+    taskAnnouncement = createMock(TaskAnnouncement.class);
+    
EasyMock.expect(taskAnnouncement.getTaskDataSource()).andReturn("dummy_DS3");
+    EasyMock.replay(taskAnnouncement);
+    taskRunner = createMock(TaskRunner.class);
+    taskConfig = createMock(TaskConfig.class);
+    overlordClient = createMock(OverlordClient.class);

Review Comment:
   Use `NoopOverlordClient` instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManagerMonitor.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+public class WorkerTaskManagerMonitor extends AbstractMonitor
+{
+  private final WorkerTaskManager workerTaskManager;
+  private static final String WORKER_RUNNING_TASK_COUNT_METRICS = 
"worker/task/running/count";
+  private static final String WORKER_ASSIGNED_TASK_COUNT_METRIC = 
"worker/task/assigned/count";
+  private static final String WORKER_COMPLETED_TASK_COUNT_METRIC = 
"worker/task/completed/count";
+
+  @Inject
+  public WorkerTaskManagerMonitor(WorkerTaskManager workerTaskManager)
+  {
+    this.workerTaskManager = workerTaskManager;
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    final Map<String, Integer> runningTasks, assignedTasks, completedTasks;
+
+    runningTasks = getDataSourceTasks(workerTaskManager.getRunningTasks(), 
WorkerTaskManager.TaskDetails::getDataSource);
+    assignedTasks = getDataSourceTasks(workerTaskManager.getAssignedTasks(), 
Task::getDataSource);
+    completedTasks = getDataSourceTasks(workerTaskManager.getCompletedTasks(), 
TaskAnnouncement::getTaskDataSource);
+
+    final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
+    emitWorkerTaskMetric(builder, emitter, WORKER_RUNNING_TASK_COUNT_METRICS, 
runningTasks);
+    emitWorkerTaskMetric(builder, emitter, WORKER_ASSIGNED_TASK_COUNT_METRIC, 
assignedTasks);
+    emitWorkerTaskMetric(builder, emitter, WORKER_COMPLETED_TASK_COUNT_METRIC, 
completedTasks);
+    return true;
+  }
+
+  public void emitWorkerTaskMetric(ServiceMetricEvent.Builder builder, 
ServiceEmitter emitter, String metricName, Map<String, Integer> 
dataSourceTaskMap)
+  {
+    for (Map.Entry<String, Integer> dataSourceTaskCount : 
dataSourceTaskMap.entrySet()) {
+      builder.setDimension(DruidMetrics.DATASOURCE, 
dataSourceTaskCount.getKey());
+      emitter.emit(builder.setMetric(metricName, 
dataSourceTaskCount.getValue()));
+    }
+  }
+
+  private <T> Map<String, Integer> getDataSourceTasks(Map<String, T> taskMap, 
Function<T, String> getDataSourceFunc)

Review Comment:
   A better name would be `getDatasourceToNumTasks` or 
`getNumTasksPerDatasource`.
   
   Also, the first argument to this method should be only the values of the map 
rather than the whole map. The key of the map (taskId) is never used in this 
method.



##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java:
##########
@@ -179,6 +179,16 @@ public Map<String, TaskAnnouncement> getCompletedTasks()
     return completedTasks;
   }
 
+  public Map<String, TaskDetails> getRunningTasks()
+  {
+    return runningTasks;

Review Comment:
   No need to expose the `TaskDetails` class as no other field of the 
`TaskDetails` class is being used in the monitoring. Might as well just do the 
following :
   
   ```suggestion
       return CollectionUtils.mapValues(runningTasks, detail -> detail.task);
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerMonitorTest.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.createMock;
+
+public class WorkerTaskManagerMonitorTest
+{
+  private WorkerTaskManager workerTaskManager;
+  private TaskRunner taskRunner;
+  private ObjectMapper jsonMapper;
+  private TaskConfig taskConfig;
+  private OverlordClient overlordClient;
+  private Task task;
+  private WorkerTaskManager.TaskDetails taskDetails;
+  private TaskAnnouncement taskAnnouncement;
+
+  @Before
+  public void setUp()
+  {
+    task = createMock(Task.class);
+    EasyMock.expect(task.getDataSource()).andReturn("dummy_DS1");
+    EasyMock.replay(task);
+    taskDetails = createMock(WorkerTaskManager.TaskDetails.class);
+    EasyMock.expect(taskDetails.getDataSource()).andReturn("dummy_DS2");
+    EasyMock.replay(taskDetails);
+    taskAnnouncement = createMock(TaskAnnouncement.class);

Review Comment:
   Try to use concrete instances of `TaskAnnouncement` and `TaskDetails` (if 
needed) instead of mocks.



##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManagerMonitor.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+public class WorkerTaskManagerMonitor extends AbstractMonitor

Review Comment:
   +1 on using the existing `WorkerTaskCountStatsMonitor` as the new metrics 
fall in the same bucket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to