kfaraz commented on code in PR #17353:
URL: https://github.com/apache/druid/pull/17353#discussion_r1948560429
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorStatus.java:
##########
@@ -21,97 +21,137 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.indexer.TaskStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
-import java.util.Map;
+import java.util.List;
-public class ScheduledBatchSupervisorSnapshot
+/**
+ * Represents the status of a scheduled batch supervisor, including its state,
+ * scheduler details, task execution aggregate counts, and recent task
activity.
+ */
+public class ScheduledBatchSupervisorStatus
{
- public enum BatchSupervisorStatus
- {
- SCHEDULER_RUNNING,
- SCHEDULER_SHUTDOWN
- }
-
- @JsonProperty
private final String supervisorId;
- @JsonProperty
- private final BatchSupervisorStatus status;
+ private final ScheduledBatchSupervisor.State state;
@Nullable
- @JsonProperty
private final DateTime lastTaskSubmittedTime;
@Nullable
- @JsonProperty
private final DateTime nextTaskSubmissionTime;
@Nullable
- @JsonProperty
private final Duration timeUntilNextTaskSubmission;
- @JsonProperty
- private final Map<String, TaskStatus> activeTasks;
+ private final Integer totalSubmittedTasks;
- @JsonProperty
- private final Map<String, TaskStatus> completedTasks;
+ private final Integer totalSuccessfulTasks;
+
+ private final Integer totalFailedTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentActiveTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentSuccessfulTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentFailedTasks;
Review Comment:
Since the individuals objects already contain the task status, do we need
these to be separate lists?
Can we just have a single list `recentTasks`?
Alternatively, the individual lists may not have the task status, just the
task ID and updated time?
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorStatus.java:
##########
@@ -21,97 +21,137 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.indexer.TaskStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
-import java.util.Map;
+import java.util.List;
-public class ScheduledBatchSupervisorSnapshot
+/**
+ * Represents the status of a scheduled batch supervisor, including its state,
+ * scheduler details, task execution aggregate counts, and recent task
activity.
+ */
+public class ScheduledBatchSupervisorStatus
{
- public enum BatchSupervisorStatus
- {
- SCHEDULER_RUNNING,
- SCHEDULER_SHUTDOWN
- }
-
- @JsonProperty
private final String supervisorId;
- @JsonProperty
- private final BatchSupervisorStatus status;
+ private final ScheduledBatchSupervisor.State state;
@Nullable
- @JsonProperty
private final DateTime lastTaskSubmittedTime;
@Nullable
- @JsonProperty
private final DateTime nextTaskSubmissionTime;
@Nullable
- @JsonProperty
private final Duration timeUntilNextTaskSubmission;
- @JsonProperty
- private final Map<String, TaskStatus> activeTasks;
+ private final Integer totalSubmittedTasks;
- @JsonProperty
- private final Map<String, TaskStatus> completedTasks;
+ private final Integer totalSuccessfulTasks;
+
+ private final Integer totalFailedTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentActiveTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentSuccessfulTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentFailedTasks;
@JsonCreator
- public ScheduledBatchSupervisorSnapshot(
+ public ScheduledBatchSupervisorStatus(
@JsonProperty("supervisorId") String supervisorId,
- @JsonProperty("status") BatchSupervisorStatus status,
+ @JsonProperty("status") ScheduledBatchSupervisor.State state,
Review Comment:
Please align the json property name with the field name `state` for
consistency with the getter.
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/BatchSupervisorTaskStatus.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.scheduledbatch;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.TaskStatus;
+import org.joda.time.DateTime;
+
+/**
+ * Represents the status of a scheduled batch supervisor task and the
timestamp of its last update.
+ */
+public class BatchSupervisorTaskStatus
+{
+ private final TaskStatus taskStatus;
+ private final DateTime updatedTime;
+
+ public BatchSupervisorTaskStatus(
+ @JsonProperty("taskStatus") TaskStatus taskStatus,
+ @JsonProperty("updatedTime") DateTime updatedTime
+ )
+ {
+ this.taskStatus = taskStatus;
+ this.updatedTime = updatedTime;
+ }
+
+ @JsonProperty("taskStatus")
+ public TaskStatus getStatus()
+ {
+ return taskStatus;
+ }
+
+ @JsonProperty
+ public DateTime getUpdatedTime()
+ {
+ return updatedTime;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BatchSupervisorTaskStatusV2{" +
Review Comment:
Do we need the `V2`?
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchStatusTracker.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.scheduledbatch;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tracks task statuses upon submission and completion for scheduled batch
supervisors.
+ * <p>
+ * This class maintains per-supervisor mappings of submitted tasks and their
statuses.
+ * It also keeps track of total task counts (active, successful, failed) and
ensures that
+ * recently tracked task statuses are retained for a limited duration ({@link
#MAX_STATUS_RETAIN_DURATION}).
+ * </p>
+ */
+public class ScheduledBatchStatusTracker
+{
+ private static final Duration MAX_STATUS_RETAIN_DURATION =
Duration.standardDays(2);
+
+ /**
+ * Track supervisor ID -> task IDs.
+ */
+ private final ConcurrentHashMap<String, List<String>> supervisorToTaskIds =
new ConcurrentHashMap<>();
+
+ /**
+ * Track the task ID -> supervisor ID for reverse lookup to update the total
counts.
+ */
+ private final ConcurrentHashMap<String, String> taskIdToSupervisorId = new
ConcurrentHashMap<>();
+
+ /**
+ * Tracks the recent set of task ID -> task status for all tasks younger
than {@link #MAX_STATUS_RETAIN_DURATION}.
+ */
+ private final ConcurrentHashMap<String, BatchSupervisorTaskStatus>
recentTaskStatusMap = new ConcurrentHashMap<>();
+
+
+ private final ConcurrentHashMap<String, AtomicInteger>
supervisorTotalSubmittedTasks = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, AtomicInteger>
supervisorTotalSuccessfulTasks = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, AtomicInteger>
supervisorTotalFailedTasks = new ConcurrentHashMap<>();
Review Comment:
Rather than these 3 maps and a separate `supervisorToTaskIds`, maybe just
keep a single map from supervisor ID to a `SupervisorStatus` object that
contains the required task counts and task IDs.
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorStatus.java:
##########
@@ -21,97 +21,137 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.indexer.TaskStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
-import java.util.Map;
+import java.util.List;
-public class ScheduledBatchSupervisorSnapshot
+/**
+ * Represents the status of a scheduled batch supervisor, including its state,
+ * scheduler details, task execution aggregate counts, and recent task
activity.
+ */
+public class ScheduledBatchSupervisorStatus
{
- public enum BatchSupervisorStatus
- {
- SCHEDULER_RUNNING,
- SCHEDULER_SHUTDOWN
- }
-
- @JsonProperty
private final String supervisorId;
- @JsonProperty
- private final BatchSupervisorStatus status;
+ private final ScheduledBatchSupervisor.State state;
@Nullable
- @JsonProperty
private final DateTime lastTaskSubmittedTime;
@Nullable
- @JsonProperty
private final DateTime nextTaskSubmissionTime;
@Nullable
- @JsonProperty
private final Duration timeUntilNextTaskSubmission;
- @JsonProperty
- private final Map<String, TaskStatus> activeTasks;
+ private final Integer totalSubmittedTasks;
- @JsonProperty
- private final Map<String, TaskStatus> completedTasks;
+ private final Integer totalSuccessfulTasks;
+
+ private final Integer totalFailedTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentActiveTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentSuccessfulTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentFailedTasks;
@JsonCreator
- public ScheduledBatchSupervisorSnapshot(
+ public ScheduledBatchSupervisorStatus(
@JsonProperty("supervisorId") String supervisorId,
- @JsonProperty("status") BatchSupervisorStatus status,
+ @JsonProperty("status") ScheduledBatchSupervisor.State state,
@JsonProperty("lastTaskSubmittedTime") @Nullable DateTime
lastTaskSubmittedTime,
@JsonProperty("nextTaskSubmissionTime") @Nullable DateTime
nextTaskSubmissionTime,
@JsonProperty("timeUntilNextTaskSubmission") @Nullable Duration
timeUntilNextTaskSubmission,
- @JsonProperty("activeTasks") Map<String, TaskStatus> activeTasks,
- @JsonProperty("completedTasks") Map<String, TaskStatus> completedTasks
+ @JsonProperty("totalSubmittedTasks") Integer totalSubmittedTasks,
+ @JsonProperty("totalSuccessfulTasks") Integer totalSuccessfulTasks,
+ @JsonProperty("totalFailedTasks") Integer totalFailedTasks,
Review Comment:
Should these be primitive ints?
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchTaskManager.java:
##########
@@ -134,72 +144,101 @@ public void startScheduledIngestion(
public void stopScheduledIngestion(final String supervisorId)
{
log.info("Stopping scheduled batch ingestion for supervisorId[%s].",
supervisorId);
- final SchedulerManager manager = supervisorToManager.get(supervisorId);
- if (manager != null) {
+ final ScheduledBatchTask taskScheduler =
supervisorToTaskScheduler.get(supervisorId);
+ if (taskScheduler != null) {
// Don't remove the supervisorId from supervisorToManager. We want to be
able to track the status
// for suspended supervisors to track any in-flight tasks. stop() will
clean up all state completely.
- manager.stopScheduling();
+ taskScheduler.stopScheduling();
}
}
+ /**
+ * Starts the scheduled batch task manager by registering the {@link
TaskRunnerListener}.
+ * This allows tracking of any tasks submitted by the batch supervisor.
+ * <p>
+ * Should be invoked when the Overlord service starts or during leadership
transitions.
+ * </p>
+ */
public void start()
{
- log.info("Starting scheduled batch scheduler.");
+ log.info("Starting scheduled batch task manager.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().registerListener(taskRunnerListener,
Execs.directExecutor());
} else {
- log.warn("Task runner not registered for scheduled batch scheduler.");
+ log.warn("Task runner not registered for scheduled batch task manager.");
}
}
+ /**
+ * Stops the scheduled batch task manager by unregistering the previously
registered {@link TaskRunnerListener}.
+ * <p>
+ * Should be invoked when the Overlord service stops or during leadership
transitions.
+ * </p>
+ */
public void stop()
{
- log.info("Stopping scheduled batch scheduler.");
+ log.info("Stopping scheduled batch task manager.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().unregisterListener(taskRunnerListener.getListenerId());
}
- supervisorToManager.clear();
+ supervisorToTaskScheduler.clear();
}
@Nullable
- public ScheduledBatchSupervisorSnapshot getSchedulerSnapshot(final String
supervisorId)
+ public ScheduledBatchSupervisorStatus getTaskManagerStatus(final String
supervisorId)
Review Comment:
```suggestion
public ScheduledBatchSupervisorStatus getSupervisorStatus(final String
supervisorId)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchTaskManager.java:
##########
@@ -290,9 +334,9 @@ private Duration getTimeUntilNextTaskSubmission()
return
cronSchedulerConfig.getDurationUntilNextTaskStartTimeAfter(DateTimes.nowUtc());
}
- private ScheduledBatchSupervisorSnapshot.BatchSupervisorStatus
getSchedulerStatus()
+ private ScheduledBatchSupervisor.State getSchedulerState()
Review Comment:
Since we are not using the `scheduler` name anymore.
```suggestion
private ScheduledBatchSupervisor.State getState()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchTaskManager.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.scheduledbatch;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages the task execution and tracking for the scheduled batch supervisor.
+ * <p>
+ * It submits MSQ tasks to the Broker and tracks their execution status. It
listens for task state changes and updates
+ * the {@link ScheduledBatchStatusTracker} accordingly.
+ * </p>
+ * <p>
+ * The manager maintains a mapping of scheduled batch supervisors and their
associated task scheduler,
+ * so each supervisor can schedule its tasks based on the {@link
CronSchedulerConfig}.
+ * It uses a shared single-threaded {@link ScheduledExecutorService} to
schedule tasks across all batch supervisors.
+ * </p>
+ * <p>
+ * Note that all task state tracking by the batch supervisor is currently
maintained in memory
+ * and is not persisted in the metadata store.
+ * </p>
+ */
+public class ScheduledBatchTaskManager
+{
+ private static final Logger log = new
EmittingLogger(ScheduledBatchTaskManager.class);
+
+ private final TaskRunnerListener taskRunnerListener;
+ private final TaskMaster taskMaster;
+ private final ScheduledBatchStatusTracker statusTracker;
+ private final BrokerClient brokerClient;
+ private final ServiceEmitter emitter;
+
+ private final ScheduledExecutorService tasksExecutor;
+
+ private final ConcurrentHashMap<String, ScheduledBatchTask>
supervisorToTaskScheduler = new ConcurrentHashMap<>();
+
+ @Inject
+ public ScheduledBatchTaskManager(
+ final TaskMaster taskMaster,
+ final ScheduledExecutorFactory executorFactory,
+ final BrokerClient brokerClient,
+ final ServiceEmitter emitter,
+ final ScheduledBatchStatusTracker statusTracker
+ )
+ {
+ this.taskMaster = taskMaster;
+ this.brokerClient = brokerClient;
+ this.emitter = emitter;
+ this.statusTracker = statusTracker;
+
+ this.tasksExecutor = executorFactory.create(1,
"ScheduledBatchTasksExecutor-%s");
Review Comment:
Maybe use the name of this class itself for this executor
`ScheduledBatchTaskManager` so that it's easier to interpret the thread names.
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorStatus.java:
##########
@@ -21,97 +21,137 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.indexer.TaskStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
-import java.util.Map;
+import java.util.List;
-public class ScheduledBatchSupervisorSnapshot
+/**
+ * Represents the status of a scheduled batch supervisor, including its state,
+ * scheduler details, task execution aggregate counts, and recent task
activity.
+ */
+public class ScheduledBatchSupervisorStatus
{
- public enum BatchSupervisorStatus
- {
- SCHEDULER_RUNNING,
- SCHEDULER_SHUTDOWN
- }
-
- @JsonProperty
private final String supervisorId;
- @JsonProperty
- private final BatchSupervisorStatus status;
+ private final ScheduledBatchSupervisor.State state;
@Nullable
- @JsonProperty
private final DateTime lastTaskSubmittedTime;
@Nullable
- @JsonProperty
private final DateTime nextTaskSubmissionTime;
@Nullable
- @JsonProperty
private final Duration timeUntilNextTaskSubmission;
- @JsonProperty
- private final Map<String, TaskStatus> activeTasks;
+ private final Integer totalSubmittedTasks;
- @JsonProperty
- private final Map<String, TaskStatus> completedTasks;
+ private final Integer totalSuccessfulTasks;
+
+ private final Integer totalFailedTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentActiveTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentSuccessfulTasks;
+
+ private final List<BatchSupervisorTaskStatus> recentFailedTasks;
@JsonCreator
- public ScheduledBatchSupervisorSnapshot(
+ public ScheduledBatchSupervisorStatus(
@JsonProperty("supervisorId") String supervisorId,
- @JsonProperty("status") BatchSupervisorStatus status,
+ @JsonProperty("status") ScheduledBatchSupervisor.State state,
@JsonProperty("lastTaskSubmittedTime") @Nullable DateTime
lastTaskSubmittedTime,
@JsonProperty("nextTaskSubmissionTime") @Nullable DateTime
nextTaskSubmissionTime,
@JsonProperty("timeUntilNextTaskSubmission") @Nullable Duration
timeUntilNextTaskSubmission,
- @JsonProperty("activeTasks") Map<String, TaskStatus> activeTasks,
- @JsonProperty("completedTasks") Map<String, TaskStatus> completedTasks
+ @JsonProperty("totalSubmittedTasks") Integer totalSubmittedTasks,
+ @JsonProperty("totalSuccessfulTasks") Integer totalSuccessfulTasks,
+ @JsonProperty("totalFailedTasks") Integer totalFailedTasks,
+ @JsonProperty("recentActiveTasks") List<BatchSupervisorTaskStatus>
recentActiveTasks,
+ @JsonProperty("recentSuccessfulTasks") List<BatchSupervisorTaskStatus>
recentSuccessfulTasks,
+ @JsonProperty("recentFailedTasks") List<BatchSupervisorTaskStatus>
recentFailedTasks
)
{
this.supervisorId = supervisorId;
- this.status = status;
+ this.state = state;
this.lastTaskSubmittedTime = lastTaskSubmittedTime;
this.nextTaskSubmissionTime = nextTaskSubmissionTime;
this.timeUntilNextTaskSubmission = timeUntilNextTaskSubmission;
- this.activeTasks = activeTasks;
- this.completedTasks = completedTasks;
+ this.totalSubmittedTasks = totalSubmittedTasks;
+ this.totalSuccessfulTasks = totalSuccessfulTasks;
+ this.totalFailedTasks = totalFailedTasks;
+ this.recentActiveTasks = recentActiveTasks;
+ this.recentSuccessfulTasks = recentSuccessfulTasks;
+ this.recentFailedTasks = recentFailedTasks;
}
+ @JsonProperty
public String getSupervisorId()
{
return supervisorId;
}
- public ScheduledBatchSupervisorSnapshot.BatchSupervisorStatus getStatus()
+ @JsonProperty
+ public ScheduledBatchSupervisor.State getState()
{
- return status;
+ return state;
}
+ @Nullable
+ @JsonProperty
public DateTime getLastTaskSubmittedTime()
{
return lastTaskSubmittedTime;
}
+ @Nullable
+ @JsonProperty
public DateTime getNextTaskSubmissionTime()
{
return nextTaskSubmissionTime;
}
+ @Nullable
+ @JsonProperty
public Duration getTimeUntilNextTaskSubmission()
Review Comment:
Is serializing this field necessary? I think `getNextTaskSubmissionTime`
should be sufficient.
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchTaskManager.java:
##########
@@ -134,72 +144,101 @@ public void startScheduledIngestion(
public void stopScheduledIngestion(final String supervisorId)
{
log.info("Stopping scheduled batch ingestion for supervisorId[%s].",
supervisorId);
- final SchedulerManager manager = supervisorToManager.get(supervisorId);
- if (manager != null) {
+ final ScheduledBatchTask taskScheduler =
supervisorToTaskScheduler.get(supervisorId);
+ if (taskScheduler != null) {
// Don't remove the supervisorId from supervisorToManager. We want to be
able to track the status
// for suspended supervisors to track any in-flight tasks. stop() will
clean up all state completely.
- manager.stopScheduling();
+ taskScheduler.stopScheduling();
}
}
+ /**
+ * Starts the scheduled batch task manager by registering the {@link
TaskRunnerListener}.
+ * This allows tracking of any tasks submitted by the batch supervisor.
+ * <p>
+ * Should be invoked when the Overlord service starts or during leadership
transitions.
+ * </p>
+ */
public void start()
{
- log.info("Starting scheduled batch scheduler.");
+ log.info("Starting scheduled batch task manager.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().registerListener(taskRunnerListener,
Execs.directExecutor());
} else {
- log.warn("Task runner not registered for scheduled batch scheduler.");
+ log.warn("Task runner not registered for scheduled batch task manager.");
}
}
+ /**
+ * Stops the scheduled batch task manager by unregistering the previously
registered {@link TaskRunnerListener}.
+ * <p>
+ * Should be invoked when the Overlord service stops or during leadership
transitions.
+ * </p>
+ */
public void stop()
{
- log.info("Stopping scheduled batch scheduler.");
+ log.info("Stopping scheduled batch task manager.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().unregisterListener(taskRunnerListener.getListenerId());
}
- supervisorToManager.clear();
+ supervisorToTaskScheduler.clear();
}
@Nullable
- public ScheduledBatchSupervisorSnapshot getSchedulerSnapshot(final String
supervisorId)
+ public ScheduledBatchSupervisorStatus getTaskManagerStatus(final String
supervisorId)
{
- final SchedulerManager manager = supervisorToManager.get(supervisorId);
+ final ScheduledBatchTask manager =
supervisorToTaskScheduler.get(supervisorId);
if (manager == null) {
return null;
}
- final ScheduledBatchStatusTracker.BatchSupervisorTaskStatus tasks =
statusTracker.getSupervisorTasks(supervisorId);
- return new ScheduledBatchSupervisorSnapshot(
+ final BatchSupervisorTaskReport taskStatus =
statusTracker.getSupervisorTaskStatus(supervisorId);
+ return new ScheduledBatchSupervisorStatus(
supervisorId,
- manager.getSchedulerStatus(),
+ manager.getSchedulerState(),
manager.getLastTaskSubmittedTime(),
manager.getNextTaskSubmissionTime(),
manager.getTimeUntilNextTaskSubmission(),
- tasks.getSubmittedTasks(),
- tasks.getCompletedTasks()
+ taskStatus.getTotalSubmittedTasks(),
+ taskStatus.getTotalSuccessfulTasks(),
+ taskStatus.getTotalFailedTasks(),
+ taskStatus.getRecentActiveTasks(),
+ taskStatus.getRecentSuccessfulTasks(),
+ taskStatus.getRecentFailedTasks()
);
}
private void submitSqlTask(final String supervisorId, final ClientSqlQuery
spec)
throws ExecutionException, InterruptedException
{
- log.debug("Submitting a new task with spec[%s] for supervisor[%s].", spec,
supervisorId);
final SqlTaskStatus taskStatus =
FutureUtils.get(brokerClient.submitSqlTask(spec), true);
statusTracker.onTaskSubmitted(supervisorId, taskStatus);
+ log.info("Submitted a new task[%s] with spec[%s] for supervisor[%s].",
taskStatus.getTaskId(), spec, supervisorId);
Review Comment:
Maybe omit the spec as it might bloat up the log. It can always be obtained
from the task log itself.
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchStatusTracker.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.scheduledbatch;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tracks task statuses upon submission and completion for scheduled batch
supervisors.
+ * <p>
+ * This class maintains per-supervisor mappings of submitted tasks and their
statuses.
+ * It also keeps track of total task counts (active, successful, failed) and
ensures that
+ * recently tracked task statuses are retained for a limited duration ({@link
#MAX_STATUS_RETAIN_DURATION}).
+ * </p>
+ */
+public class ScheduledBatchStatusTracker
+{
+ private static final Duration MAX_STATUS_RETAIN_DURATION =
Duration.standardDays(2);
+
+ /**
+ * Track supervisor ID -> task IDs.
+ */
+ private final ConcurrentHashMap<String, List<String>> supervisorToTaskIds =
new ConcurrentHashMap<>();
+
+ /**
+ * Track the task ID -> supervisor ID for reverse lookup to update the total
counts.
+ */
+ private final ConcurrentHashMap<String, String> taskIdToSupervisorId = new
ConcurrentHashMap<>();
+
+ /**
+ * Tracks the recent set of task ID -> task status for all tasks younger
than {@link #MAX_STATUS_RETAIN_DURATION}.
+ */
+ private final ConcurrentHashMap<String, BatchSupervisorTaskStatus>
recentTaskStatusMap = new ConcurrentHashMap<>();
Review Comment:
These two maps can be merged into one by adding a `supervisorId` field to
`BatchSupervisorTaskStatus`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchTaskManager.java:
##########
@@ -134,72 +144,101 @@ public void startScheduledIngestion(
public void stopScheduledIngestion(final String supervisorId)
{
log.info("Stopping scheduled batch ingestion for supervisorId[%s].",
supervisorId);
- final SchedulerManager manager = supervisorToManager.get(supervisorId);
- if (manager != null) {
+ final ScheduledBatchTask taskScheduler =
supervisorToTaskScheduler.get(supervisorId);
+ if (taskScheduler != null) {
// Don't remove the supervisorId from supervisorToManager. We want to be
able to track the status
// for suspended supervisors to track any in-flight tasks. stop() will
clean up all state completely.
- manager.stopScheduling();
+ taskScheduler.stopScheduling();
}
}
+ /**
+ * Starts the scheduled batch task manager by registering the {@link
TaskRunnerListener}.
+ * This allows tracking of any tasks submitted by the batch supervisor.
+ * <p>
+ * Should be invoked when the Overlord service starts or during leadership
transitions.
+ * </p>
+ */
public void start()
{
- log.info("Starting scheduled batch scheduler.");
+ log.info("Starting scheduled batch task manager.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().registerListener(taskRunnerListener,
Execs.directExecutor());
} else {
- log.warn("Task runner not registered for scheduled batch scheduler.");
+ log.warn("Task runner not registered for scheduled batch task manager.");
}
}
+ /**
+ * Stops the scheduled batch task manager by unregistering the previously
registered {@link TaskRunnerListener}.
+ * <p>
+ * Should be invoked when the Overlord service stops or during leadership
transitions.
+ * </p>
+ */
public void stop()
{
- log.info("Stopping scheduled batch scheduler.");
+ log.info("Stopping scheduled batch task manager.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().unregisterListener(taskRunnerListener.getListenerId());
}
- supervisorToManager.clear();
+ supervisorToTaskScheduler.clear();
}
@Nullable
- public ScheduledBatchSupervisorSnapshot getSchedulerSnapshot(final String
supervisorId)
+ public ScheduledBatchSupervisorStatus getTaskManagerStatus(final String
supervisorId)
{
- final SchedulerManager manager = supervisorToManager.get(supervisorId);
+ final ScheduledBatchTask manager =
supervisorToTaskScheduler.get(supervisorId);
if (manager == null) {
return null;
}
- final ScheduledBatchStatusTracker.BatchSupervisorTaskStatus tasks =
statusTracker.getSupervisorTasks(supervisorId);
- return new ScheduledBatchSupervisorSnapshot(
+ final BatchSupervisorTaskReport taskStatus =
statusTracker.getSupervisorTaskStatus(supervisorId);
+ return new ScheduledBatchSupervisorStatus(
Review Comment:
Rather than duplicating fields between `BatchSupervisorTaskReport` and
`ScheduledBatchSupervisorStatus`, just add a field `BatchSupervisorTaskReport
taskReport` in `ScheduledBatchSupervisorStatus`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchTaskManager.java:
##########
@@ -259,15 +302,16 @@ private synchronized void startScheduling()
private synchronized void stopScheduling()
{
- status =
ScheduledBatchSupervisorSnapshot.BatchSupervisorStatus.SCHEDULER_SHUTDOWN;
+ state = ScheduledBatchSupervisor.State.SUSPENDED;
+ statusTracker.cleanupStaleTaskStatuses();
Review Comment:
Since this is being done inside the task manager of a single supervisor,
this method should accept the supervisor ID and cleanup items only for that
supervisor.
Alternatively, don't call this method from inside the `ScheduledBatchTask`
and instead schedule a periodic cleanup job on the `tasksExecutor`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchTaskManager.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.scheduledbatch;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages the task execution and tracking for the scheduled batch supervisor.
+ * <p>
+ * It submits MSQ tasks to the Broker and tracks their execution status. It
listens for task state changes and updates
+ * the {@link ScheduledBatchStatusTracker} accordingly.
+ * </p>
+ * <p>
+ * The manager maintains a mapping of scheduled batch supervisors and their
associated task scheduler,
+ * so each supervisor can schedule its tasks based on the {@link
CronSchedulerConfig}.
+ * It uses a shared single-threaded {@link ScheduledExecutorService} to
schedule tasks across all batch supervisors.
+ * </p>
+ * <p>
+ * Note that all task state tracking by the batch supervisor is currently
maintained in memory
+ * and is not persisted in the metadata store.
+ * </p>
+ */
+public class ScheduledBatchTaskManager
+{
+ private static final Logger log = new
EmittingLogger(ScheduledBatchTaskManager.class);
+
+ private final TaskRunnerListener taskRunnerListener;
+ private final TaskMaster taskMaster;
+ private final ScheduledBatchStatusTracker statusTracker;
+ private final BrokerClient brokerClient;
+ private final ServiceEmitter emitter;
+
+ private final ScheduledExecutorService tasksExecutor;
+
+ private final ConcurrentHashMap<String, ScheduledBatchTask>
supervisorToTaskScheduler = new ConcurrentHashMap<>();
+
+ @Inject
+ public ScheduledBatchTaskManager(
+ final TaskMaster taskMaster,
+ final ScheduledExecutorFactory executorFactory,
+ final BrokerClient brokerClient,
+ final ServiceEmitter emitter,
+ final ScheduledBatchStatusTracker statusTracker
+ )
+ {
+ this.taskMaster = taskMaster;
+ this.brokerClient = brokerClient;
+ this.emitter = emitter;
+ this.statusTracker = statusTracker;
+
+ this.tasksExecutor = executorFactory.create(1,
"ScheduledBatchTasksExecutor-%s");
+
+ this.taskRunnerListener = new TaskRunnerListener()
+ {
+ @Override
+ public String getListenerId()
+ {
+ return "ScheduledBatchTaskManager";
+ }
+
+ @Override
+ public void locationChanged(String taskId, TaskLocation newLocation)
+ {
+ // Do nothing
+ }
+
+ @Override
+ public void statusChanged(final String taskId, final TaskStatus
taskStatus)
+ {
+ if (taskStatus.isComplete()) {
+ statusTracker.onTaskCompleted(taskId, taskStatus);
+ }
+ }
+ };
+ }
+
+ /**
+ * Initializes and starts scheduled ingestion for the specified {@code
supervisorId}, scheduling tasks
+ * using the provided {@code spec} and {@code cronSchedulerConfig}.
+ */
+ public void startScheduledIngestion(
+ final String supervisorId,
+ final String dataSource,
+ final CronSchedulerConfig cronSchedulerConfig,
+ final ClientSqlQuery spec
+ )
+ {
+ log.info(
+ "Starting scheduled batch ingestion into datasource[%s] with
supervisorId[%s], cronSchedule[%s] and spec[%s].",
+ supervisorId, dataSource, cronSchedulerConfig, spec
+ );
+ final ScheduledBatchTask taskScheduler = new
ScheduledBatchTask(supervisorId, dataSource, cronSchedulerConfig, spec);
+ taskScheduler.startScheduling();
+ supervisorToTaskScheduler.put(supervisorId, taskScheduler);
+ }
+
+ /**
+ * Stops the scheduler for the specified {@code supervisorId}, pausing task
submissions
+ * while retaining the ability to track the supervisor's status.
+ */
+ public void stopScheduledIngestion(final String supervisorId)
+ {
+ log.info("Stopping scheduled batch ingestion for supervisorId[%s].",
supervisorId);
+ final ScheduledBatchTask taskScheduler =
supervisorToTaskScheduler.get(supervisorId);
+ if (taskScheduler != null) {
+ // Don't remove the supervisorId from supervisorToManager. We want to be
able to track the status
+ // for suspended supervisors to track any in-flight tasks. stop() will
clean up all state completely.
+ taskScheduler.stopScheduling();
+ }
+ }
+
+ /**
+ * Starts the scheduled batch task manager by registering the {@link
TaskRunnerListener}.
+ * This allows tracking of any tasks submitted by the batch supervisor.
+ * <p>
+ * Should be invoked when the Overlord service starts or during leadership
transitions.
+ * </p>
+ */
+ public void start()
+ {
+ log.info("Starting scheduled batch task manager.");
+ final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
+ if (taskRunnerOptional.isPresent()) {
+ taskRunnerOptional.get().registerListener(taskRunnerListener,
Execs.directExecutor());
+ } else {
+ log.warn("Task runner not registered for scheduled batch task manager.");
+ }
+ }
+
+ /**
+ * Stops the scheduled batch task manager by unregistering the previously
registered {@link TaskRunnerListener}.
+ * <p>
+ * Should be invoked when the Overlord service stops or during leadership
transitions.
+ * </p>
+ */
+ public void stop()
+ {
+ log.info("Stopping scheduled batch task manager.");
+ final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
+ if (taskRunnerOptional.isPresent()) {
+
taskRunnerOptional.get().unregisterListener(taskRunnerListener.getListenerId());
+ }
+
+ supervisorToTaskScheduler.clear();
Review Comment:
Before doing a clear, we should also tearDown each of the
`ScheduledBatchTask` objects.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]