This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0335aaa279 Add query results directory and prevent the auto cleaner 
from cleaning it (#14446)
0335aaa279 is described below

commit 0335aaa279d95063c6a172cf3f1ffdf6e46f592b
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Wed Jun 28 10:14:04 2023 +0530

    Add query results directory and prevent the auto cleaner from cleaning it 
(#14446)
    
    Adds support for automatic cleaning of a "query-results" directory in 
durable storage. This directory will be cleaned up only if the task id is not 
known to the overlord. This will allow the storage of query results after the 
task has finished running.
---
 docs/multi-stage-query/reference.md                |  2 +-
 .../druid/msq/indexing/DurableStorageCleaner.java  | 15 +++++--
 .../msq/indexing/DurableStorageCleanerTest.java    | 49 ++++++++++++++++++++--
 .../druid/frame/util/DurableStorageUtils.java      | 41 ++++++++++++++++--
 .../druid/frame/util/DurableStorageUtilsTest.java  | 39 ++++++++++++++---
 5 files changed, 129 insertions(+), 17 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index ec6a5b1543..5bbe935f1e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -365,7 +365,7 @@ For detailed information about the settings related to 
durable storage, see [Dur
 
 ### Use durable storage for queries
 
-When you run a query,  include the context parameter `durableShuffleStorage` 
and set it to `true`. 
+When you run a query, include the context parameter `durableShuffleStorage` 
and set it to `true`.
 
 For queries where you want to use fault tolerance for workers,  set 
`faultTolerance` to `true`, which automatically sets `durableShuffleStorage` to 
`true`.
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
index f2b5be699b..195b1e26f8 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
@@ -88,13 +88,22 @@ public class DurableStorageCleaner implements OverlordDuty
                                            .map(TaskRunnerWorkItem::getTaskId)
                                            
.map(DurableStorageUtils::getControllerDirectory)
                                            .collect(Collectors.toSet());
+    Set<String> knownTaskIds = taskRunner.getKnownTasks()
+                                         .stream()
+                                         .map(TaskRunnerWorkItem::getTaskId)
+                                         
.map(DurableStorageUtils::getControllerDirectory)
+                                         .collect(Collectors.toSet());
 
     Set<String> filesToRemove = new HashSet<>();
     while (allFiles.hasNext()) {
       String currentFile = allFiles.next();
-      String taskIdFromPathOrEmpty = 
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
-      if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
-        if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
+      String nextDirName = 
DurableStorageUtils.getNextDirNameWithPrefixFromPath(currentFile);
+      if (nextDirName != null && !nextDirName.isEmpty()) {
+        if (runningTaskIds.contains(nextDirName)) {
+          // do nothing
+        } else if (DurableStorageUtils.QUERY_RESULTS_DIR.equals(nextDirName)
+                   && DurableStorageUtils.isQueryResultFileActive(currentFile, 
knownTaskIds)) {
+          // query results should not be cleaned even if the task has finished 
running
           // do nothing
         } else {
           filesToRemove.add(currentFile);
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
index 0f674739b2..7f7bf4fbfd 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.storage.StorageConnector;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collection;
@@ -45,19 +46,25 @@ public class DurableStorageCleanerTest
   private static final TaskRunnerWorkItem TASK_RUNNER_WORK_ITEM = 
EasyMock.mock(TaskRunnerWorkItem.class);
   private static final String TASK_ID = "dummyTaskId";
   private static final String STRAY_DIR = "strayDirectory";
+  private DurableStorageCleaner durableStorageCleaner;
 
-  @Test
-  public void testRun() throws Exception
+  @Before
+  public void setUp()
   {
-    EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
+    EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR, 
TASK_MASTER);
     DurableStorageCleanerConfig durableStorageCleanerConfig = new 
DurableStorageCleanerConfig();
     durableStorageCleanerConfig.delaySeconds = 1L;
     durableStorageCleanerConfig.enabled = true;
-    DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
+    durableStorageCleaner = new DurableStorageCleaner(
         durableStorageCleanerConfig,
         STORAGE_CONNECTOR,
         () -> TASK_MASTER
     );
+  }
+
+  @Test
+  public void testRun() throws Exception
+  {
     EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
             
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID),
 STRAY_DIR)
                                     .stream()
@@ -68,15 +75,49 @@ public class DurableStorageCleanerTest
     EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getRunningTasks())
             .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
             .anyTimes();
+    EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getKnownTasks())
+            .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
+            .anyTimes();
     
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
     Capture<Set<String>> capturedArguments = EasyMock.newCapture();
     STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
     EasyMock.expectLastCall().once();
     EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, 
STORAGE_CONNECTOR);
+
     durableStorageCleaner.run();
+
     Assert.assertEquals(Sets.newHashSet(STRAY_DIR), 
capturedArguments.getValue());
   }
 
+  @Test
+  public void testRunExcludesQueryDirectory() throws Exception
+  {
+    final String resultPath = DurableStorageUtils.QUERY_RESULTS_DIR + "/" + 
DurableStorageUtils.getControllerDirectory(TASK_ID) + "/results.json";
+    final String intermediateFilesPath = 
DurableStorageUtils.getControllerDirectory(TASK_ID) + "/intermediate.frame";
+    EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
+            .andReturn(ImmutableList.of(resultPath, STRAY_DIR, 
intermediateFilesPath)
+                                    .stream()
+                                    .iterator())
+            .anyTimes();
+    
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
+    EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
+            .anyTimes();
+    EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getRunningTasks())
+            .andReturn(ImmutableList.of())
+            .anyTimes();
+    EasyMock.expect((Collection<TaskRunnerWorkItem>) 
TASK_RUNNER.getKnownTasks())
+            .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
+            .anyTimes();
+    Capture<Set<String>> capturedArguments = EasyMock.newCapture();
+    STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
+    EasyMock.expectLastCall().once();
+    EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, 
STORAGE_CONNECTOR);
+
+    durableStorageCleaner.run();
+
+    Assert.assertEquals(Sets.newHashSet(STRAY_DIR, intermediateFilesPath), 
capturedArguments.getValue());
+  }
+
   @Test
   public void testGetSchedule()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java 
b/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
index 8aea264bc3..1585b86515 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
@@ -20,11 +20,14 @@
 package org.apache.druid.frame.util;
 
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.java.util.common.StringUtils;
 
 import javax.annotation.Nullable;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Helper class that fetches the directory and file names corresponding to 
file location
@@ -32,7 +35,8 @@ import java.util.Iterator;
 public class DurableStorageUtils
 {
   public static final String SUCCESS_MARKER_FILENAME = "__success";
-  public static final Splitter SPLITTER = Splitter.on("/").limit(2);
+  public static final Splitter SPLITTER = Splitter.on("/").limit(3);
+  public static final String QUERY_RESULTS_DIR = "query-results";
 
   public static String getControllerDirectory(final String controllerTaskId)
   {
@@ -127,7 +131,7 @@ public class DurableStorageUtils
   }
 
   /**
-   * Tries to parse out the controller taskID from the input path.
+   * Tries to parse out the most top level directory from the path. Returns 
null if there is no such directory.
    * <br></br>
    * For eg:
    * <br/>
@@ -138,7 +142,7 @@ public class DurableStorageUtils
    * </ul>
    */
   @Nullable
-  public static String getControllerTaskIdWithPrefixFromPath(String path)
+  public static String getNextDirNameWithPrefixFromPath(String path)
   {
     if (path == null) {
       return null;
@@ -150,4 +154,35 @@ public class DurableStorageUtils
       return null;
     }
   }
+
+  /**
+   * Tries to parse out the controller taskID from the query results path, and 
checks if the taskID is present in the
+   * set of known tasks.
+   * Returns true if the set contains the taskId.
+   * Returns false if taskId could not be parsed or if the set does not 
contain the taskId.
+   * <br></br>
+   * For eg:
+   * <br/>
+   * <ul>
+   *   <li>for path <b>controller_query_id/task/123</b> the function will 
return <b>false</b></li>
+   *   <li>for path <b>query-result/controller_query_id/results.json</b>, the 
function will return <b>true</b></li> if the controller_query_id is in known 
tasks
+   *   <li>for path <b>query-result/controller_query_id/results.json</b>, the 
function will return <b>false</b></li> if the controller_query_id is not in 
known tasks
+   *   <li>for path <b>null</b>, the function will return <b>false</b></li>
+   * </ul>
+   */
+  public static boolean isQueryResultFileActive(String path, Set<String> 
knownTasks)
+  {
+    if (path == null) {
+      return false;
+    }
+    Iterator<String> elementsIterator = SPLITTER.split(path).iterator();
+    List<String> elements = ImmutableList.copyOf(elementsIterator);
+    if (elements.size() < 2) {
+      return false;
+    }
+    if (!DurableStorageUtils.QUERY_RESULTS_DIR.equals(elements.get(0))) {
+      return false;
+    }
+    return knownTasks.contains(elements.get(1));
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
 
b/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
index 8e32038107..71b16633e9 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.frame.util;
 
+import com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -26,13 +27,39 @@ public class DurableStorageUtilsTest
 {
 
   @Test
-  public void getControllerTaskIdWithPrefixFromPath()
+  public void getNextDirNameWithPrefixFromPath()
   {
-    Assert.assertEquals("", 
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("/123/123"));
-    Assert.assertEquals("123", 
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("123"));
+    Assert.assertEquals("", 
DurableStorageUtils.getNextDirNameWithPrefixFromPath("/123/123"));
+    Assert.assertEquals("123", 
DurableStorageUtils.getNextDirNameWithPrefixFromPath("123"));
     Assert.assertEquals("controller_query_123",
-                        
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("controller_query_123/123"));
-    Assert.assertEquals("", 
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(""));
-    
Assert.assertNull(DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(null));
+                        
DurableStorageUtils.getNextDirNameWithPrefixFromPath("controller_query_123/123"));
+    Assert.assertEquals("", 
DurableStorageUtils.getNextDirNameWithPrefixFromPath(""));
+    
Assert.assertNull(DurableStorageUtils.getNextDirNameWithPrefixFromPath(null));
+  }
+
+  @Test
+  public void isQueryResultFileActive()
+  {
+
+    Assert.assertTrue(DurableStorageUtils.isQueryResultFileActive(
+        DurableStorageUtils.QUERY_RESULTS_DIR + "/123/result",
+        ImmutableSet.of("123")
+    ));
+    Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
+        DurableStorageUtils.QUERY_RESULTS_DIR + "/123/result",
+        ImmutableSet.of("")
+    ));
+    Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
+        DurableStorageUtils.QUERY_RESULTS_DIR + "/",
+        ImmutableSet.of("123")
+    ));
+    Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
+        null,
+        ImmutableSet.of("123")
+    ));
+    Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
+        DurableStorageUtils.QUERY_RESULTS_DIR,
+        ImmutableSet.of("123")
+    ));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to