kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195899385
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
}
}
);
- return statusFuture;
}
/**
- * Resync the contents of this task queue with our storage facility. Useful
to make sure our in-memory state
- * corresponds to the storage facility even if the latter is manually
modified.
+ * Resync the contents of this task queue with our storage facility.
+ * Useful to make sure our in-memory state corresponds to the storage
facility
+ * even if the latter is manually modified.
+ * <p>
+ * This method must be called only when queue is {@link #active}, except when
+ * starting up.
*/
- private void syncFromStorage()
+ private synchronized void syncFromStorage()
{
- giant.lock();
-
- try {
- if (active) {
- final Map<String, Task> newTasks =
toTaskIDMap(taskStorage.getActiveTasks());
- final int tasksSynced = newTasks.size();
- final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
- // Calculate differences on IDs instead of Task Objects.
- Set<String> commonIds =
Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
- for (String taskID : commonIds) {
- newTasks.remove(taskID);
- oldTasks.remove(taskID);
- }
- Collection<Task> addedTasks = newTasks.values();
- Collection<Task> removedTasks = oldTasks.values();
-
- // Clean up removed Tasks
- for (Task task : removedTasks) {
- removeTaskInternal(task.getId());
- }
-
- // Add newly Added tasks to the queue
- for (Task task : addedTasks) {
- addTaskInternal(task);
- }
-
- log.info(
- "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
- tasksSynced,
- addedTasks.size(),
- removedTasks.size()
- );
- requestManagement();
- } else {
- log.info("Not active. Skipping storage sync.");
- }
- }
- catch (Exception e) {
- log.warn(e, "Failed to sync tasks from storage!");
- throw new RuntimeException(e);
- }
- finally {
- giant.unlock();
- }
- }
-
- private static Map<String, Task> toTaskIDMap(List<Task> taskList)
- {
- Map<String, Task> rv = new HashMap<>();
- for (Task task : taskList) {
- rv.put(task.getId(), task);
- }
- return rv;
- }
-
- private Map<String, Long> getDeltaValues(Map<String, Long> total,
Map<String, Long> prev)
- {
- return total.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()
- prev.getOrDefault(e.getKey(), 0L)));
- }
+ final Map<String, Task> newTasks =
taskStorage.getActiveTasks().stream().collect(
+ Collectors.toMap(Task::getId, Function.identity())
+ );
+ final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+ // Calculate differences on IDs instead of Task Objects.
+ Set<String> commonIds = Sets.intersection(newTasks.keySet(),
oldTasks.keySet());
+ for (String taskId : commonIds) {
+ newTasks.remove(taskId);
+ oldTasks.remove(taskId);
+ }
+ Collection<Task> addedTasks = newTasks.values();
+ Collection<Task> removedTasks = oldTasks.values();
+
+ // Add new tasks and clean up removed tasks
+ addedTasks.forEach(this::addTaskInternal);
+ removedTasks.forEach(this::removeTaskInternal);
+ log.info(
+ "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d]
tasks.",
+ newTasks.size(), addedTasks.size(), removedTasks.size()
+ );
- public Map<String, Long> getSuccessfulTaskCount()
- {
- Map<String, Long> total =
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
- synchronized (totalSuccessfulTaskCount) {
- Map<String, Long> delta = getDeltaValues(total,
prevTotalSuccessfulTaskCount);
- prevTotalSuccessfulTaskCount = total;
- return delta;
- }
+ requestManagement();
}
- public Map<String, Long> getFailedTaskCount()
+ public Map<String, Long> getAndResetSuccessfulTaskCounts()
{
- Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount,
AtomicLong::get);
- synchronized (totalFailedTaskCount) {
- Map<String, Long> delta = getDeltaValues(total,
prevTotalFailedTaskCount);
- prevTotalFailedTaskCount = total;
- return delta;
- }
+ Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+ datasourceToSuccessfulTaskCount.clear();
Review Comment:
Yeah, I didn't like this either. Updated to iterate over keys and atomically
remove them one by one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]