This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0335aaa279 Add query results directory and prevent the auto cleaner
from cleaning it (#14446)
0335aaa279 is described below
commit 0335aaa279d95063c6a172cf3f1ffdf6e46f592b
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Wed Jun 28 10:14:04 2023 +0530
Add query results directory and prevent the auto cleaner from cleaning it
(#14446)
Adds support for automatic cleaning of a "query-results" directory in
durable storage. This directory will be cleaned up only if the task id is not
known to the overlord. This will allow the storage of query results after the
task has finished running.
---
docs/multi-stage-query/reference.md | 2 +-
.../druid/msq/indexing/DurableStorageCleaner.java | 15 +++++--
.../msq/indexing/DurableStorageCleanerTest.java | 49 ++++++++++++++++++++--
.../druid/frame/util/DurableStorageUtils.java | 41 ++++++++++++++++--
.../druid/frame/util/DurableStorageUtilsTest.java | 39 ++++++++++++++---
5 files changed, 129 insertions(+), 17 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index ec6a5b1543..5bbe935f1e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -365,7 +365,7 @@ For detailed information about the settings related to
durable storage, see [Dur
### Use durable storage for queries
-When you run a query, include the context parameter `durableShuffleStorage`
and set it to `true`.
+When you run a query, include the context parameter `durableShuffleStorage`
and set it to `true`.
For queries where you want to use fault tolerance for workers, set
`faultTolerance` to `true`, which automatically sets `durableShuffleStorage` to
`true`.
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
index f2b5be699b..195b1e26f8 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
@@ -88,13 +88,22 @@ public class DurableStorageCleaner implements OverlordDuty
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
+ Set<String> knownTaskIds = taskRunner.getKnownTasks()
+ .stream()
+ .map(TaskRunnerWorkItem::getTaskId)
+
.map(DurableStorageUtils::getControllerDirectory)
+ .collect(Collectors.toSet());
Set<String> filesToRemove = new HashSet<>();
while (allFiles.hasNext()) {
String currentFile = allFiles.next();
- String taskIdFromPathOrEmpty =
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
- if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
- if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
+ String nextDirName =
DurableStorageUtils.getNextDirNameWithPrefixFromPath(currentFile);
+ if (nextDirName != null && !nextDirName.isEmpty()) {
+ if (runningTaskIds.contains(nextDirName)) {
+ // do nothing
+ } else if (DurableStorageUtils.QUERY_RESULTS_DIR.equals(nextDirName)
+ && DurableStorageUtils.isQueryResultFileActive(currentFile,
knownTaskIds)) {
+ // query results should not be cleaned even if the task has finished
running
// do nothing
} else {
filesToRemove.add(currentFile);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
index 0f674739b2..7f7bf4fbfd 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.storage.StorageConnector;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
@@ -45,19 +46,25 @@ public class DurableStorageCleanerTest
private static final TaskRunnerWorkItem TASK_RUNNER_WORK_ITEM =
EasyMock.mock(TaskRunnerWorkItem.class);
private static final String TASK_ID = "dummyTaskId";
private static final String STRAY_DIR = "strayDirectory";
+ private DurableStorageCleaner durableStorageCleaner;
- @Test
- public void testRun() throws Exception
+ @Before
+ public void setUp()
{
- EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
+ EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR,
TASK_MASTER);
DurableStorageCleanerConfig durableStorageCleanerConfig = new
DurableStorageCleanerConfig();
durableStorageCleanerConfig.delaySeconds = 1L;
durableStorageCleanerConfig.enabled = true;
- DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
+ durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleanerConfig,
STORAGE_CONNECTOR,
() -> TASK_MASTER
);
+ }
+
+ @Test
+ public void testRun() throws Exception
+ {
EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID),
STRAY_DIR)
.stream()
@@ -68,15 +75,49 @@ public class DurableStorageCleanerTest
EasyMock.expect((Collection<TaskRunnerWorkItem>)
TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
+ EasyMock.expect((Collection<TaskRunnerWorkItem>)
TASK_RUNNER.getKnownTasks())
+ .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
+ .anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
Capture<Set<String>> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().once();
EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM,
STORAGE_CONNECTOR);
+
durableStorageCleaner.run();
+
Assert.assertEquals(Sets.newHashSet(STRAY_DIR),
capturedArguments.getValue());
}
+ @Test
+ public void testRunExcludesQueryDirectory() throws Exception
+ {
+ final String resultPath = DurableStorageUtils.QUERY_RESULTS_DIR + "/" +
DurableStorageUtils.getControllerDirectory(TASK_ID) + "/results.json";
+ final String intermediateFilesPath =
DurableStorageUtils.getControllerDirectory(TASK_ID) + "/intermediate.frame";
+ EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
+ .andReturn(ImmutableList.of(resultPath, STRAY_DIR,
intermediateFilesPath)
+ .stream()
+ .iterator())
+ .anyTimes();
+
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
+ EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
+ .anyTimes();
+ EasyMock.expect((Collection<TaskRunnerWorkItem>)
TASK_RUNNER.getRunningTasks())
+ .andReturn(ImmutableList.of())
+ .anyTimes();
+ EasyMock.expect((Collection<TaskRunnerWorkItem>)
TASK_RUNNER.getKnownTasks())
+ .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
+ .anyTimes();
+ Capture<Set<String>> capturedArguments = EasyMock.newCapture();
+ STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM,
STORAGE_CONNECTOR);
+
+ durableStorageCleaner.run();
+
+ Assert.assertEquals(Sets.newHashSet(STRAY_DIR, intermediateFilesPath),
capturedArguments.getValue());
+ }
+
@Test
public void testGetSchedule()
{
diff --git
a/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
b/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
index 8aea264bc3..1585b86515 100644
---
a/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
+++
b/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
@@ -20,11 +20,14 @@
package org.apache.druid.frame.util;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
/**
* Helper class that fetches the directory and file names corresponding to
file location
@@ -32,7 +35,8 @@ import java.util.Iterator;
public class DurableStorageUtils
{
public static final String SUCCESS_MARKER_FILENAME = "__success";
- public static final Splitter SPLITTER = Splitter.on("/").limit(2);
+ public static final Splitter SPLITTER = Splitter.on("/").limit(3);
+ public static final String QUERY_RESULTS_DIR = "query-results";
public static String getControllerDirectory(final String controllerTaskId)
{
@@ -127,7 +131,7 @@ public class DurableStorageUtils
}
/**
- * Tries to parse out the controller taskID from the input path.
+ * Tries to parse out the most top level directory from the path. Returns
null if there is no such directory.
* <br></br>
* For eg:
* <br/>
@@ -138,7 +142,7 @@ public class DurableStorageUtils
* </ul>
*/
@Nullable
- public static String getControllerTaskIdWithPrefixFromPath(String path)
+ public static String getNextDirNameWithPrefixFromPath(String path)
{
if (path == null) {
return null;
@@ -150,4 +154,35 @@ public class DurableStorageUtils
return null;
}
}
+
+ /**
+ * Tries to parse out the controller taskID from the query results path, and
checks if the taskID is present in the
+ * set of known tasks.
+ * Returns true if the set contains the taskId.
+ * Returns false if taskId could not be parsed or if the set does not
contain the taskId.
+ * <br></br>
+ * For eg:
+ * <br/>
+ * <ul>
+ * <li>for path <b>controller_query_id/task/123</b> the function will
return <b>false</b></li>
+ * <li>for path <b>query-result/controller_query_id/results.json</b>, the
function will return <b>true</b></li> if the controller_query_id is in known
tasks
+ * <li>for path <b>query-result/controller_query_id/results.json</b>, the
function will return <b>false</b></li> if the controller_query_id is not in
known tasks
+ * <li>for path <b>null</b>, the function will return <b>false</b></li>
+ * </ul>
+ */
+ public static boolean isQueryResultFileActive(String path, Set<String>
knownTasks)
+ {
+ if (path == null) {
+ return false;
+ }
+ Iterator<String> elementsIterator = SPLITTER.split(path).iterator();
+ List<String> elements = ImmutableList.copyOf(elementsIterator);
+ if (elements.size() < 2) {
+ return false;
+ }
+ if (!DurableStorageUtils.QUERY_RESULTS_DIR.equals(elements.get(0))) {
+ return false;
+ }
+ return knownTasks.contains(elements.get(1));
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
b/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
index 8e32038107..71b16633e9 100644
---
a/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.frame.util;
+import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;
@@ -26,13 +27,39 @@ public class DurableStorageUtilsTest
{
@Test
- public void getControllerTaskIdWithPrefixFromPath()
+ public void getNextDirNameWithPrefixFromPath()
{
- Assert.assertEquals("",
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("/123/123"));
- Assert.assertEquals("123",
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("123"));
+ Assert.assertEquals("",
DurableStorageUtils.getNextDirNameWithPrefixFromPath("/123/123"));
+ Assert.assertEquals("123",
DurableStorageUtils.getNextDirNameWithPrefixFromPath("123"));
Assert.assertEquals("controller_query_123",
-
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("controller_query_123/123"));
- Assert.assertEquals("",
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(""));
-
Assert.assertNull(DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(null));
+
DurableStorageUtils.getNextDirNameWithPrefixFromPath("controller_query_123/123"));
+ Assert.assertEquals("",
DurableStorageUtils.getNextDirNameWithPrefixFromPath(""));
+
Assert.assertNull(DurableStorageUtils.getNextDirNameWithPrefixFromPath(null));
+ }
+
+ @Test
+ public void isQueryResultFileActive()
+ {
+
+ Assert.assertTrue(DurableStorageUtils.isQueryResultFileActive(
+ DurableStorageUtils.QUERY_RESULTS_DIR + "/123/result",
+ ImmutableSet.of("123")
+ ));
+ Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
+ DurableStorageUtils.QUERY_RESULTS_DIR + "/123/result",
+ ImmutableSet.of("")
+ ));
+ Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
+ DurableStorageUtils.QUERY_RESULTS_DIR + "/",
+ ImmutableSet.of("123")
+ ));
+ Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
+ null,
+ ImmutableSet.of("123")
+ ));
+ Assert.assertFalse(DurableStorageUtils.isQueryResultFileActive(
+ DurableStorageUtils.QUERY_RESULTS_DIR,
+ ImmutableSet.of("123")
+ ));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]