This is an automated email from the ASF dual-hosted git repository.

yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bfcc0e5377e Use time-based retention for MSQ query result file cleanup 
(#19074)
bfcc0e5377e is described below

commit bfcc0e5377e9cd7752e7e36474d7558c66e014b3
Author: Cece Mei <[email protected]>
AuthorDate: Tue Mar 10 10:48:40 2026 -0700

    Use time-based retention for MSQ query result file cleanup (#19074)
    
    * retention
    
    * final
    
    * format
    
    * review
    
    * review
---
 .../indexing/overlord/HeapMemoryTaskStorage.java   | 39 +++---------
 .../indexing/overlord/MetadataTaskStorage.java     |  6 ++
 .../druid/indexing/overlord/TaskStorage.java       | 11 ++++
 .../metadata/MetadataStorageActionHandler.java     | 19 +++---
 .../indexing/cleaner/DurableStorageCleaner.java    | 26 ++++----
 .../cleaner/DurableStorageCleanerConfig.java       | 16 +++++
 .../msq/indexing/DurableStorageCleanerTest.java    | 70 ++++++++++++++++++----
 .../druid/frame/util/DurableStorageUtils.java      | 50 +++++++++-------
 .../druid/frame/util/DurableStorageUtilsTest.java  | 18 +++---
 9 files changed, 162 insertions(+), 93 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
index b3201ea9314..3c7376b68af 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Ordering;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Inject;
 import org.apache.druid.error.EntryAlreadyExists;
@@ -43,6 +42,7 @@ import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -187,33 +187,12 @@ public class HeapMemoryTaskStorage implements TaskStorage
     return listBuilder.build();
   }
 
-  public List<TaskInfo> 
getRecentlyCreatedAlreadyFinishedTaskInfo(CompleteTaskLookup taskLookup)
-  {
-    final Ordering<TaskInfo> createdDateDesc = new Ordering<TaskInfo>()
-    {
-      @Override
-      public int compare(TaskInfo a, TaskInfo b)
-      {
-        return a.getCreatedTime().compareTo(b.getCreatedTime());
-      }
-    }.reverse();
-
-    return getRecentlyCreatedAlreadyFinishedTaskInfoSince(
-        taskLookup.getTasksCreatedPriorTo(),
-        taskLookup.getMaxTaskStatuses(),
-        createdDateDesc
-    );
-  }
-
   /**
    * NOTE: This method is racy as it searches for complete tasks and active 
tasks separately outside a lock.
    * This method should be used only for testing.
    */
   @Override
-  public List<TaskInfo> getTaskInfos(
-      Map<TaskLookupType, TaskLookup> taskLookups,
-      @Nullable String datasource
-  )
+  public List<TaskInfo> getTaskInfos(Map<TaskLookupType, TaskLookup> 
taskLookups, @Nullable String datasource)
   {
     final List<TaskInfo> tasks = new ArrayList<>();
     final Map<TaskLookupType, TaskLookup> processedTaskLookups =
@@ -224,7 +203,8 @@ public class HeapMemoryTaskStorage implements TaskStorage
 
     processedTaskLookups.forEach((type, lookup) -> {
       if (type == TaskLookupType.COMPLETE) {
-        
tasks.addAll(getRecentlyCreatedAlreadyFinishedTaskInfo((CompleteTaskLookup) 
lookup));
+        CompleteTaskLookup completed = (CompleteTaskLookup) lookup;
+        tasks.addAll(getCompletedTasksInfo(completed.getTasksCreatedPriorTo(), 
completed.getMaxTaskStatuses()));
       } else {
         tasks.addAll(getActiveTaskInfo(datasource));
       }
@@ -244,21 +224,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
                                                 .collect(Collectors.toList());
   }
 
-  private List<TaskInfo> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
-      DateTime start,
-      @Nullable Integer n,
-      Ordering<TaskInfo> createdDateDesc
-  )
+  @Override
+  public List<TaskInfo> getCompletedTasksInfo(DateTime start, @Nullable 
Integer n)
   {
     Stream<TaskInfo> stream = tasks
         .values()
         .stream()
         .filter(taskInfo -> taskInfo.getStatus().isComplete() && 
taskInfo.getCreatedTime().isAfter(start))
-        .sorted(createdDateDesc);
+        .sorted(Comparator.comparing(TaskInfo::getCreatedTime).reversed());
     if (n != null) {
       stream = stream.limit(n);
     }
-    return stream.collect(Collectors.toUnmodifiableList());
+    return stream.toList();
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 9dba47c6278..35858fdc734 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -159,6 +159,12 @@ public class MetadataTaskStorage implements TaskStorage
     return handler.getTaskInfo(taskId);
   }
 
+  @Override
+  public List<TaskInfo> getCompletedTasksInfo(DateTime start, @Nullable 
Integer n)
+  {
+    return handler.getTaskInfos(Map.of(TaskLookupType.COMPLETE, new 
TaskLookup.CompleteTaskLookup(n, start)), null);
+  }
+
   @Override
   public List<Task> getActiveTasks()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
index 49a248ec4b8..3aac918b5ef 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
@@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.metadata.TaskLookup;
 import org.apache.druid.metadata.TaskLookup.TaskLookupType;
+import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -107,6 +108,16 @@ public interface TaskStorage
    */
   Optional<TaskStatus> getStatus(String taskid);
 
+  /**
+   * Returns completed tasks (succeeded or failed) created after the specified 
time, sorted by
+   * creation time descending (most recent first).
+   *
+   * @param start minimum creation time (exclusive); only tasks created after 
this time are included
+   * @param n     optional limit on number of tasks to return; if null, 
returns all matching tasks
+   * @return list of completed task infos matching the criteria
+   */
+  List<TaskInfo> getCompletedTasksInfo(DateTime start, @Nullable Integer n);
+
   @Nullable
   TaskInfo getTaskInfo(String taskId);
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java
 
b/indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java
index 23e315aac0c..e1ec480243f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java
@@ -95,15 +95,18 @@ public interface MetadataStorageActionHandler
   TaskInfo getTaskInfo(String entryId);
 
   /**
-   * Returns a list of {@link TaskInfo} from metadata store that matches to 
the given filters.
-   * <p>
-   * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns 
all active tasks in the metadata store.
-   * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
-   * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
-   * All lookups should be processed atomically if there are more than one 
lookup is given.
+   * Returns tasks from the metadata store matching the given filters.
+   * <ul>
+   *   <li>{@link TaskLookupType#ACTIVE} returns all active tasks (not yet 
complete).</li>
+   *   <li>{@link TaskLookupType#COMPLETE} returns completed tasks sorted by 
creation time descending
+   *       (most recent first). For complete tasks, {@link 
TaskLookup.CompleteTaskLookup} provides
+   *       optional filters for maximum count and creation time cutoff.</li>
+   * </ul>
+   * Multiple lookups are processed atomically.
    *
-   * @param taskLookups task lookup type and filters.
-   * @param datasource  datasource filter
+   * @param taskLookups map of task lookup types to their filters
+   * @param datasource  optional datasource filter; if null, includes all 
datasources
+   * @return list of {@link TaskInfo} objects (task, status, and creation 
time) matching the criteria
    */
   List<TaskInfo> getTaskInfos(
       Map<TaskLookupType, TaskLookup> taskLookups,
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java
index 6b45273fa72..525da3c63fb 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java
@@ -19,16 +19,18 @@
 
 package org.apache.druid.msq.indexing.cleaner;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import org.apache.druid.frame.util.DurableStorageUtils;
+import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.duty.DutySchedule;
 import org.apache.druid.indexing.overlord.duty.OverlordDuty;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.guice.MultiStageQuery;
 import org.apache.druid.storage.StorageConnector;
@@ -52,17 +54,20 @@ public class DurableStorageCleaner implements OverlordDuty
   private final DurableStorageCleanerConfig config;
   private final StorageConnector storageConnector;
   private final Provider<TaskMaster> taskMasterProvider;
+  private final TaskStorage taskStorage;
 
   @Inject
   public DurableStorageCleaner(
       final DurableStorageCleanerConfig config,
       final @MultiStageQuery StorageConnectorProvider storageConnectorProvider,
-      @JacksonInject final Provider<TaskMaster> taskMasterProvider
+      final Provider<TaskMaster> taskMasterProvider,
+      final TaskStorage taskStorage
   )
   {
     this.config = config;
     this.storageConnector = 
storageConnectorProvider.createStorageConnector(null);
     this.taskMasterProvider = taskMasterProvider;
+    this.taskStorage = taskStorage;
   }
 
   @Override
@@ -89,11 +94,11 @@ public class DurableStorageCleaner implements OverlordDuty
                                            .map(TaskRunnerWorkItem::getTaskId)
                                            
.map(DurableStorageUtils::getControllerDirectory)
                                            .collect(Collectors.toSet());
-    Set<String> knownTaskIds = taskRunner.getKnownTasks()
-                                         .stream()
-                                         .map(TaskRunnerWorkItem::getTaskId)
-                                         
.map(DurableStorageUtils::getControllerDirectory)
-                                         .collect(Collectors.toSet());
+    Set<String> recentlyCompletedTaskIds =
+        
taskStorage.getCompletedTasksInfo(DateTimes.nowUtc().minus(config.getDurationToRetain().getMillis()),
 null)
+                   .stream()
+                   .map(TaskInfo::getId)
+                   .collect(Collectors.toSet());
 
     Set<String> filesToRemove = new HashSet<>();
     while (allFiles.hasNext()) {
@@ -101,11 +106,10 @@ public class DurableStorageCleaner implements OverlordDuty
       String nextDirName = 
DurableStorageUtils.getNextDirNameWithPrefixFromPath(currentFile);
       if (nextDirName != null && !nextDirName.isEmpty()) {
         if (runningTaskIds.contains(nextDirName)) {
-          // do nothing
+          // do nothing, query results should not be cleaned if the task is 
still running
         } else if (DurableStorageUtils.QUERY_RESULTS_DIR.equals(nextDirName)
-                   && DurableStorageUtils.isQueryResultFileActive(currentFile, 
knownTaskIds)) {
-          // query results should not be cleaned even if the task has finished 
running
-          // do nothing
+                   && DurableStorageUtils.isQueryResultFileActive(currentFile, 
recentlyCompletedTaskIds)) {
+          // do nothing, query results should not be cleaned if the task is 
created before retain period
         } else {
           filesToRemove.add(currentFile);
         }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleanerConfig.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleanerConfig.java
index d45950f37ed..71a960fef15 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleanerConfig.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleanerConfig.java
@@ -20,6 +20,8 @@
 package org.apache.druid.msq.indexing.cleaner;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.Duration;
+import org.joda.time.Period;
 
 import javax.validation.constraints.Min;
 
@@ -39,6 +41,14 @@ public class DurableStorageCleanerConfig
   @Min(1)
   public long delaySeconds = 86400L;
 
+  /**
+   * The duration to retain query results in durable storage after task 
completion.
+   * Query result files will be deleted if they are older than this duration.
+   * Defaults to 6 hours.
+   */
+  @JsonProperty
+  public Duration durationToRetain = new Period("PT6H").toStandardDuration();
+
   public boolean isEnabled()
   {
     return enabled;
@@ -49,12 +59,18 @@ public class DurableStorageCleanerConfig
     return delaySeconds;
   }
 
+  public Duration getDurationToRetain()
+  {
+    return durationToRetain;
+  }
+
   @Override
   public String toString()
   {
     return "DurableStorageCleanerConfig{" +
            "enabled=" + enabled +
            ", delaySeconds=" + delaySeconds +
+           ", durationToRetain=" + durationToRetain +
            '}';
   }
 }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
index 0ddf807d707..4f77af7c2a4 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
@@ -23,26 +23,34 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import org.apache.druid.frame.util.DurableStorageUtils;
+import org.apache.druid.indexer.TaskInfo;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.duty.DutySchedule;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner;
 import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig;
 import org.apache.druid.storage.NilStorageConnector;
 import org.apache.druid.storage.StorageConnector;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 
 public class DurableStorageCleanerTest
 {
-
+  private static final TaskStorage TASK_STORAGE = 
EasyMock.mock(TaskStorage.class);
   private static final TaskMaster TASK_MASTER = 
EasyMock.mock(TaskMaster.class);
   private static final TaskRunner TASK_RUNNER = 
EasyMock.mock(TaskRunner.class);
   private static final StorageConnector STORAGE_CONNECTOR = 
EasyMock.mock(StorageConnector.class);
@@ -54,20 +62,25 @@ public class DurableStorageCleanerTest
   @Before
   public void setUp()
   {
-    EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR, 
TASK_MASTER);
+    EasyMock.reset(TASK_STORAGE, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, 
STORAGE_CONNECTOR, TASK_MASTER);
     DurableStorageCleanerConfig durableStorageCleanerConfig = new 
DurableStorageCleanerConfig();
     durableStorageCleanerConfig.delaySeconds = 1L;
     durableStorageCleanerConfig.enabled = true;
+    durableStorageCleanerConfig.durationToRetain = new Duration(5_000L);
     durableStorageCleaner = new DurableStorageCleaner(
         durableStorageCleanerConfig,
         s -> STORAGE_CONNECTOR,
-        () -> TASK_MASTER
+        () -> TASK_MASTER,
+        TASK_STORAGE
     );
   }
 
   @Test
   public void testRun() throws Exception
   {
+    EasyMock.expect(TASK_STORAGE.getCompletedTasksInfo(EasyMock.anyObject(), 
EasyMock.anyInt()))
+            .andReturn(List.of())
+            .anyTimes();
     EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
             
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID),
 STRAY_DIR)
                                     .stream()
@@ -78,24 +91,53 @@ public class DurableStorageCleanerTest
     EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getRunningTasks())
             .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
             .anyTimes();
-    EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getKnownTasks())
-            .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
-            .anyTimes();
     
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
     Capture<Set<String>> capturedArguments = EasyMock.newCapture();
     STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
     EasyMock.expectLastCall().once();
-    EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, 
STORAGE_CONNECTOR);
+    EasyMock.replay(TASK_STORAGE, TASK_MASTER, TASK_RUNNER, 
TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
 
     durableStorageCleaner.run();
 
     Assert.assertEquals(Sets.newHashSet(STRAY_DIR), 
capturedArguments.getValue());
   }
 
+  @Test
+  public void testRunClearsStaleOrNotFoundTask() throws Exception
+  {
+    EasyMock.expect(TASK_STORAGE.getCompletedTasksInfo(EasyMock.anyObject(), 
EasyMock.anyInt()))
+            .andReturn(List.of())
+            .anyTimes();
+    EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
+            
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID))
+                                    .stream()
+                                    .iterator())
+            .anyTimes();
+    EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
+            .anyTimes();
+    EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getRunningTasks())
+            .andReturn(ImmutableList.of())
+            .anyTimes();
+    
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
+    Capture<Set<String>> capturedArguments = EasyMock.newCapture();
+    STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
+    EasyMock.expectLastCall().once();
+    EasyMock.replay(TASK_STORAGE, TASK_MASTER, TASK_RUNNER, 
TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
+
+    durableStorageCleaner.run();
+
+    
Assert.assertEquals(Set.of(DurableStorageUtils.getControllerDirectory(TASK_ID)),
 capturedArguments.getValue());
+  }
+
   @Test
   public void testRunExcludesQueryDirectory() throws Exception
   {
-    final String resultPath = DurableStorageUtils.QUERY_RESULTS_DIR + "/" + 
DurableStorageUtils.getControllerDirectory(TASK_ID) + "/results.json";
+    Task completedTask = new NoopTask(TASK_ID, null, null, 1, 0, null);
+    EasyMock.expect(TASK_STORAGE.getCompletedTasksInfo(EasyMock.anyObject(), 
EasyMock.anyInt()))
+            .andReturn(List.of(new TaskInfo(DateTimes.of("2020-01-01"), 
TaskStatus.success("not-used"), completedTask)))
+            .anyTimes();
+    final String resultPath = DurableStorageUtils.QUERY_RESULTS_DIR + "/" + 
DurableStorageUtils.getControllerDirectory(
+        TASK_ID) + "/results.json";
     final String intermediateFilesPath = 
DurableStorageUtils.getControllerDirectory(TASK_ID) + "/intermediate.frame";
     EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
             .andReturn(ImmutableList.of(resultPath, STRAY_DIR, 
intermediateFilesPath)
@@ -108,13 +150,10 @@ public class DurableStorageCleanerTest
     EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getRunningTasks())
             .andReturn(ImmutableList.of())
             .anyTimes();
-    EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getKnownTasks())
-            .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
-            .anyTimes();
     Capture<Set<String>> capturedArguments = EasyMock.newCapture();
     STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
     EasyMock.expectLastCall().once();
-    EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, 
STORAGE_CONNECTOR);
+    EasyMock.replay(TASK_STORAGE, TASK_MASTER, TASK_RUNNER, 
TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
 
     durableStorageCleaner.run();
 
@@ -127,7 +166,12 @@ public class DurableStorageCleanerTest
     DurableStorageCleanerConfig cleanerConfig = new 
DurableStorageCleanerConfig();
     cleanerConfig.delaySeconds = 10L;
     cleanerConfig.enabled = true;
-    DurableStorageCleaner durableStorageCleaner = new 
DurableStorageCleaner(cleanerConfig, (temp) -> 
NilStorageConnector.getInstance(), null);
+    DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
+        cleanerConfig,
+        (temp) -> NilStorageConnector.getInstance(),
+        null,
+        null
+    );
 
     DutySchedule schedule = durableStorageCleaner.getSchedule();
     Assert.assertEquals(cleanerConfig.delaySeconds * 1000, 
schedule.getPeriodMillis());
diff --git 
a/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java 
b/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
index 168d96fc20a..6f3f1686475 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
@@ -52,6 +52,23 @@ public class DurableStorageUtils
     );
   }
 
+  @Nullable
+  private static String getControllerTaskId(final String path)
+  {
+    Iterator<String> elementsIterator = SPLITTER.split(path).iterator();
+    List<String> elements = ImmutableList.copyOf(elementsIterator);
+    if (elements.size() < 2) {
+      return null;
+    }
+    if (!DurableStorageUtils.QUERY_RESULTS_DIR.equals(elements.get(0))) {
+      return null;
+    }
+    if (!elements.get(1).startsWith("controller_")) {
+      return null;
+    }
+    return elements.get(1).substring("controller_".length());
+  }
+
 
   public static String getWorkerOutputSuccessFilePath(
       final String controllerTaskId,
@@ -149,6 +166,7 @@ public class DurableStorageUtils
         taskId
     );
   }
+
   /**
    * Fetches the file location where a particular worker writes the data 
corresponding to a particular stage
    * and partition
@@ -228,33 +246,23 @@ public class DurableStorageUtils
   }
 
   /**
-   * Tries to parse out the controller taskID from the query results path, and 
checks if the taskID is present in the
-   * set of known tasks.
-   * Returns true if the set contains the taskId.
-   * Returns false if taskId could not be parsed or if the set does not 
contain the taskId.
-   * <br></br>
-   * For eg:
-   * <br/>
-   * <ul>
-   *   <li>for path <b>controller_query_id/task/123</b> the function will 
return <b>false</b></li>
-   *   <li>for path <b>query-result/controller_query_id/results.json</b>, the 
function will return <b>true</b></li> if the controller_query_id is in known 
tasks
-   *   <li>for path <b>query-result/controller_query_id/results.json</b>, the 
function will return <b>false</b></li> if the controller_query_id is not in 
known tasks
-   *   <li>for path <b>null</b>, the function will return <b>false</b></li>
-   * </ul>
+   * Checks if a query result file should be retained by checking if its task 
ID is recently completed.
+   * Parses the controller task ID from paths under {@link #QUERY_RESULTS_DIR} 
and checks membership
+   * in the provided set.
+   *
+   * @param path                      the file path to check
+   * @param recentlyCompletedTaskIds  set of task IDs that completed recently 
and whose files should be retained
+   * @return {@code true} if the file belongs to a recently completed task; 
{@code false} otherwise
    */
-  public static boolean isQueryResultFileActive(String path, Set<String> 
knownTasks)
+  public static boolean isQueryResultFileActive(String path, Set<String> 
recentlyCompletedTaskIds)
   {
     if (path == null) {
       return false;
     }
-    Iterator<String> elementsIterator = SPLITTER.split(path).iterator();
-    List<String> elements = ImmutableList.copyOf(elementsIterator);
-    if (elements.size() < 2) {
-      return false;
-    }
-    if (!DurableStorageUtils.QUERY_RESULTS_DIR.equals(elements.get(0))) {
+    final String taskId = getControllerTaskId(path);
+    if (taskId == null) {
       return false;
     }
-    return knownTasks.contains(elements.get(1));
+    return recentlyCompletedTaskIds.contains(taskId);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
 
b/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
index 2f01a402db4..e26d47dd137 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
@@ -19,10 +19,11 @@
 
 package org.apache.druid.frame.util;
 
-import com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Set;
+
 public class DurableStorageUtilsTest
 {
 
@@ -52,26 +53,25 @@ public class DurableStorageUtilsTest
   @Test
   public void isQueryResultFileActive()
   {
-
     Assert.assertTrue(DurableStorageUtils.isQueryResultFileActive(
-        DurableStorageUtils.QUERY_RESULTS_DIR + "/123/result",
-        ImmutableSet.of("123")
+        DurableStorageUtils.QUERY_RESULTS_DIR + "/controller_123/result",
+        Set.of("123")
     ));
     Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
-        DurableStorageUtils.QUERY_RESULTS_DIR + "/123/result",
-        ImmutableSet.of("")
+        DurableStorageUtils.QUERY_RESULTS_DIR + "/controller_456/result",
+        Set.of("123")
     ));
     Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
         DurableStorageUtils.QUERY_RESULTS_DIR + "/",
-        ImmutableSet.of("123")
+        Set.of("123")
     ));
     Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
         null,
-        ImmutableSet.of("123")
+        Set.of("123")
     ));
     Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
         DurableStorageUtils.QUERY_RESULTS_DIR,
-        ImmutableSet.of("123")
+        Set.of("123")
     ));
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to