github-code-scanning[bot] commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195495474
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@
}
}
);
- return statusFuture;
}
/**
- * Resync the contents of this task queue with our storage facility. Useful
to make sure our in-memory state
- * corresponds to the storage facility even if the latter is manually
modified.
+ * Resync the contents of this task queue with our storage facility.
+ * Useful to make sure our in-memory state corresponds to the storage
facility
+ * even if the latter is manually modified.
+ * <p>
+ * This method must be called only when queue is {@link #active}, except when
+ * starting up.
*/
- private void syncFromStorage()
+ private synchronized void syncFromStorage()
{
- giant.lock();
-
- try {
- if (active) {
- final Map<String, Task> newTasks =
toTaskIDMap(taskStorage.getActiveTasks());
- final int tasksSynced = newTasks.size();
- final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
- // Calculate differences on IDs instead of Task Objects.
- Set<String> commonIds =
Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
- for (String taskID : commonIds) {
- newTasks.remove(taskID);
- oldTasks.remove(taskID);
- }
- Collection<Task> addedTasks = newTasks.values();
- Collection<Task> removedTasks = oldTasks.values();
-
- // Clean up removed Tasks
- for (Task task : removedTasks) {
- removeTaskInternal(task.getId());
- }
-
- // Add newly Added tasks to the queue
- for (Task task : addedTasks) {
- addTaskInternal(task);
- }
-
- log.info(
- "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
- tasksSynced,
- addedTasks.size(),
- removedTasks.size()
- );
- requestManagement();
- } else {
- log.info("Not active. Skipping storage sync.");
- }
- }
- catch (Exception e) {
- log.warn(e, "Failed to sync tasks from storage!");
- throw new RuntimeException(e);
- }
- finally {
- giant.unlock();
- }
- }
-
- private static Map<String, Task> toTaskIDMap(List<Task> taskList)
- {
- Map<String, Task> rv = new HashMap<>();
- for (Task task : taskList) {
- rv.put(task.getId(), task);
- }
- return rv;
- }
-
- private Map<String, Long> getDeltaValues(Map<String, Long> total,
Map<String, Long> prev)
- {
- return total.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()
- prev.getOrDefault(e.getKey(), 0L)));
- }
+ final Map<String, Task> newTasks =
taskStorage.getActiveTasks().stream().collect(
+ Collectors.toMap(Task::getId, Function.identity())
+ );
+ final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+ // Calculate differences on IDs instead of Task Objects.
+ Set<String> commonIds = Sets.intersection(newTasks.keySet(),
oldTasks.keySet());
+ for (String taskId : commonIds) {
+ newTasks.remove(taskId);
+ oldTasks.remove(taskId);
+ }
+ Collection<Task> addedTasks = newTasks.values();
+ Collection<Task> removedTasks = oldTasks.values();
+
+ // Add new tasks and clean up removed tasks
+ addedTasks.forEach(this::addTaskInternal);
+ removedTasks.forEach(this::removeTaskInternal);
+ log.info(
+ "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d]
tasks.",
+ newTasks.size(), addedTasks.size(), removedTasks.size()
+ );
- public Map<String, Long> getSuccessfulTaskCount()
- {
- Map<String, Long> total =
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
- synchronized (totalSuccessfulTaskCount) {
- Map<String, Long> delta = getDeltaValues(total,
prevTotalSuccessfulTaskCount);
- prevTotalSuccessfulTaskCount = total;
- return delta;
- }
+ requestManagement();
}
- public Map<String, Long> getFailedTaskCount()
+ public Map<String, Long> getAndResetSuccessfulTaskCounts()
{
- Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount,
AtomicLong::get);
- synchronized (totalFailedTaskCount) {
- Map<String, Long> delta = getDeltaValues(total,
prevTotalFailedTaskCount);
- prevTotalFailedTaskCount = total;
- return delta;
- }
+ Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+ datasourceToSuccessfulTaskCount.clear();
+ return total;
}
- Map<String, String> getCurrentTaskDatasources()
+ public Map<String, Long> getAndResetFailedTaskCounts()
{
- giant.lock();
- try {
- return tasks.values().stream().collect(Collectors.toMap(Task::getId,
Task::getDataSource));
- }
- finally {
- giant.unlock();
- }
+ Map<String, Long> total = new HashMap<>(datasourceToFailedTaskCount);
+ datasourceToFailedTaskCount.clear();
+ return total;
}
public Map<String, Long> getRunningTaskCount()
{
- Map<String, String> taskDatasources = getCurrentTaskDatasources();
- return taskRunner.getRunningTasks()
- .stream()
- .collect(Collectors.toMap(
- e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
- e -> 1L,
- Long::sum
- ));
+ return taskRunner.getRunningTasks().stream().collect(
+ Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L,
Long::sum)
+ );
}
public Map<String, Long> getPendingTaskCount()
{
- Map<String, String> taskDatasources = getCurrentTaskDatasources();
- return taskRunner.getPendingTasks()
- .stream()
- .collect(Collectors.toMap(
- e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
- e -> 1L,
- Long::sum
- ));
+ return taskRunner.getPendingTasks().stream().collect(
+ Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L,
Long::sum)
Review Comment:
## Useless parameter
The parameter 'task' is never used.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4227)
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@
}
}
);
- return statusFuture;
}
/**
- * Resync the contents of this task queue with our storage facility. Useful
to make sure our in-memory state
- * corresponds to the storage facility even if the latter is manually
modified.
+ * Resync the contents of this task queue with our storage facility.
+ * Useful to make sure our in-memory state corresponds to the storage
facility
+ * even if the latter is manually modified.
+ * <p>
+ * This method must be called only when queue is {@link #active}, except when
+ * starting up.
*/
- private void syncFromStorage()
+ private synchronized void syncFromStorage()
{
- giant.lock();
-
- try {
- if (active) {
- final Map<String, Task> newTasks =
toTaskIDMap(taskStorage.getActiveTasks());
- final int tasksSynced = newTasks.size();
- final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
- // Calculate differences on IDs instead of Task Objects.
- Set<String> commonIds =
Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
- for (String taskID : commonIds) {
- newTasks.remove(taskID);
- oldTasks.remove(taskID);
- }
- Collection<Task> addedTasks = newTasks.values();
- Collection<Task> removedTasks = oldTasks.values();
-
- // Clean up removed Tasks
- for (Task task : removedTasks) {
- removeTaskInternal(task.getId());
- }
-
- // Add newly Added tasks to the queue
- for (Task task : addedTasks) {
- addTaskInternal(task);
- }
-
- log.info(
- "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
- tasksSynced,
- addedTasks.size(),
- removedTasks.size()
- );
- requestManagement();
- } else {
- log.info("Not active. Skipping storage sync.");
- }
- }
- catch (Exception e) {
- log.warn(e, "Failed to sync tasks from storage!");
- throw new RuntimeException(e);
- }
- finally {
- giant.unlock();
- }
- }
-
- private static Map<String, Task> toTaskIDMap(List<Task> taskList)
- {
- Map<String, Task> rv = new HashMap<>();
- for (Task task : taskList) {
- rv.put(task.getId(), task);
- }
- return rv;
- }
-
- private Map<String, Long> getDeltaValues(Map<String, Long> total,
Map<String, Long> prev)
- {
- return total.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()
- prev.getOrDefault(e.getKey(), 0L)));
- }
+ final Map<String, Task> newTasks =
taskStorage.getActiveTasks().stream().collect(
+ Collectors.toMap(Task::getId, Function.identity())
+ );
+ final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+ // Calculate differences on IDs instead of Task Objects.
+ Set<String> commonIds = Sets.intersection(newTasks.keySet(),
oldTasks.keySet());
+ for (String taskId : commonIds) {
+ newTasks.remove(taskId);
+ oldTasks.remove(taskId);
+ }
+ Collection<Task> addedTasks = newTasks.values();
+ Collection<Task> removedTasks = oldTasks.values();
+
+ // Add new tasks and clean up removed tasks
+ addedTasks.forEach(this::addTaskInternal);
+ removedTasks.forEach(this::removeTaskInternal);
+ log.info(
+ "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d]
tasks.",
+ newTasks.size(), addedTasks.size(), removedTasks.size()
+ );
- public Map<String, Long> getSuccessfulTaskCount()
- {
- Map<String, Long> total =
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
- synchronized (totalSuccessfulTaskCount) {
- Map<String, Long> delta = getDeltaValues(total,
prevTotalSuccessfulTaskCount);
- prevTotalSuccessfulTaskCount = total;
- return delta;
- }
+ requestManagement();
}
- public Map<String, Long> getFailedTaskCount()
+ public Map<String, Long> getAndResetSuccessfulTaskCounts()
{
- Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount,
AtomicLong::get);
- synchronized (totalFailedTaskCount) {
- Map<String, Long> delta = getDeltaValues(total,
prevTotalFailedTaskCount);
- prevTotalFailedTaskCount = total;
- return delta;
- }
+ Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+ datasourceToSuccessfulTaskCount.clear();
+ return total;
}
- Map<String, String> getCurrentTaskDatasources()
+ public Map<String, Long> getAndResetFailedTaskCounts()
{
- giant.lock();
- try {
- return tasks.values().stream().collect(Collectors.toMap(Task::getId,
Task::getDataSource));
- }
- finally {
- giant.unlock();
- }
+ Map<String, Long> total = new HashMap<>(datasourceToFailedTaskCount);
+ datasourceToFailedTaskCount.clear();
+ return total;
}
public Map<String, Long> getRunningTaskCount()
{
- Map<String, String> taskDatasources = getCurrentTaskDatasources();
- return taskRunner.getRunningTasks()
- .stream()
- .collect(Collectors.toMap(
- e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
- e -> 1L,
- Long::sum
- ));
+ return taskRunner.getRunningTasks().stream().collect(
+ Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L,
Long::sum)
Review Comment:
## Useless parameter
The parameter 'task' is never used.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4226)
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
}
- @VisibleForTesting
- void setActive(boolean active)
- {
- this.active = active;
- }
-
/**
* Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
*/
@LifecycleStart
- public void start()
+ public synchronized void start()
{
- giant.lock();
-
- try {
- Preconditions.checkState(!active, "queue must be stopped");
- active = true;
- syncFromStorage();
- // Mark these tasks as failed as they could not reacuire the lock
- // Clean up needs to happen after tasks have been synced from storage
- Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
- for (Task task : tasksToFail) {
- shutdown(task.getId(),
- "Shutting down forcefully as task failed to reacquire lock
while becoming leader");
+ Preconditions.checkState(!active, "queue must be stopped");
+
+ // Mark queue as active only after first sync is complete
+ syncFromStorage();
+ active = true;
+
+ // Mark these tasks as failed as they could not reacquire locks
+ Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+ for (Task task : tasksToFail) {
+ shutdown(
+ task.getId(),
+ "Shutting down forcefully as task failed to reacquire lock while
becoming leader"
+ );
+ }
+ requestManagement();
+ // Remove any unacquired locks from storage (shutdown only clears entries
for which a TaskLockPosse was acquired)
+ // This is called after requesting management as locks need to be cleared
after notifyStatus is processed
+ for (Task task : tasksToFail) {
+ for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+ taskStorage.removeLock(task.getId(), lock);
}
- managerExec.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- while (true) {
- try {
- manage();
- break;
- }
- catch (InterruptedException e) {
- log.info("Interrupted, exiting!");
- break;
- }
- catch (Exception e) {
- final long restartDelay =
config.getRestartDelay().getMillis();
- log.makeAlert(e, "Failed to manage").addData("restartDelay",
restartDelay).emit();
- try {
- Thread.sleep(restartDelay);
- }
- catch (InterruptedException e2) {
- log.info("Interrupted, exiting!");
- break;
- }
- }
- }
+ }
+ log.info("Cleaned up [%d] tasks which failed to reacquire locks.",
tasksToFail.size());
+
+ // Submit task management job
+ managerExec.submit(
+ () -> {
+ log.info("Beginning task management in [%s].",
config.getStartDelay());
+ long startDelayMillis = config.getStartDelay().getMillis();
+ while (active) {
+ try {
+ Thread.sleep(startDelayMillis);
+ runTaskManagement();
+ }
+ catch (InterruptedException e) {
+ log.info("Interrupted, stopping task management.");
+ break;
+ }
+ catch (Exception e) {
+ startDelayMillis = config.getRestartDelay().getMillis();
+ log.makeAlert(e, "Failed to manage").addData("restartDelay",
startDelayMillis).emit();
}
}
- );
- ScheduledExecutors.scheduleAtFixedRate(
- storageSyncExec,
- config.getStorageSyncRate(),
- new Callable<ScheduledExecutors.Signal>()
- {
- @Override
- public ScheduledExecutors.Signal call()
- {
- try {
- syncFromStorage();
- }
- catch (Exception e) {
- if (active) {
- log.makeAlert(e, "Failed to sync with storage").emit();
- }
- }
- if (active) {
- return ScheduledExecutors.Signal.REPEAT;
- } else {
- return ScheduledExecutors.Signal.STOP;
- }
+ }
+ );
+
+ // Schedule storage sync job
+ ScheduledExecutors.scheduleAtFixedRate(
+ storageSyncExec,
+ config.getStorageSyncRate(),
+ () -> {
+ if (!active) {
+ log.info("Stopping storage sync as TaskQueue has been stopped");
+ return ScheduledExecutors.Signal.STOP;
+ }
+
+ try {
+ syncFromStorage();
+ }
+ catch (Exception e) {
+ if (active) {
+ log.makeAlert(e, "Failed to sync with storage").emit();
}
}
- );
- requestManagement();
- // Remove any unacquired locks from storage (shutdown only clears
entries for which a TaskLockPosse was acquired)
- // This is called after requesting management as locks need to be
cleared after notifyStatus is processed
- for (Task task : tasksToFail) {
- for (TaskLock lock : taskStorage.getLocks(task.getId())) {
- taskStorage.removeLock(task.getId(), lock);
+
+ return active ? ScheduledExecutors.Signal.REPEAT :
ScheduledExecutors.Signal.STOP;
}
- }
- }
- finally {
- giant.unlock();
- }
+ );
}
/**
* Shuts down the queue.
*/
@LifecycleStop
- public void stop()
+ public synchronized void stop()
{
- giant.lock();
-
- try {
- tasks.clear();
- taskFutures.clear();
- active = false;
- managerExec.shutdownNow();
- storageSyncExec.shutdownNow();
- requestManagement();
- }
- finally {
- giant.unlock();
- }
- }
-
- public boolean isActive()
- {
- return active;
+ active = false;
+ tasks.clear();
+ submittedTaskIds.clear();
+ recentlyCompletedTaskIds.clear();
+ managerExec.shutdownNow();
+ storageSyncExec.shutdownNow();
+ requestManagement();
}
/**
* Request management from the management thread. Non-blocking.
- *
- * Other callers (such as notifyStatus) should trigger activity on the
- * TaskQueue thread by requesting management here.
+ * <p>
+ * Callers (such as notifyStatus) can trigger task management by calling
+ * this method.
*/
- void requestManagement()
+ private void requestManagement()
{
- // use a BlockingQueue since the offer/poll/wait behaviour is simple
- // and very easy to reason about
-
- // the request has to be offer (non blocking), since someone might request
- // while already holding giant lock
-
- // do not care if the item fits into the queue:
- // if the queue is already full, request has been triggered anyway
- managementMayBeNecessary.offer(this);
+ synchronized (managementRequested) {
+ managementRequested.set(true);
+ managementRequested.notify();
Review Comment:
## notify instead of notifyAll
Using notify rather than notifyAll.
[Show more
details](https://github.com/apache/druid/security/code-scanning/4228)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]