gianm commented on code in PR #19074:
URL: https://github.com/apache/druid/pull/19074#discussion_r2885191869
##########
processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java:
##########
@@ -255,6 +257,8 @@ public static boolean isQueryResultFileActive(String path,
Set<String> knownTask
if (!DurableStorageUtils.QUERY_RESULTS_DIR.equals(elements.get(0))) {
return false;
}
- return knownTasks.contains(elements.get(1));
+ Optional<DateTime> taskCreatedAt =
taskCreationTimeFn.apply(elements.get(1));
Review Comment:
I think this isn't going to work, because the directory is computed using
`DurableStorageUtils.getControllerDirectory`, and isn't exactly the same as the
controller task ID. Please double check this and update the tests so they
confirm this part works.
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleanerConfig.java:
##########
@@ -39,6 +40,9 @@ public class DurableStorageCleanerConfig
@Min(1)
public long delaySeconds = 86400L;
+ @JsonProperty
+ public long durationToRetain = TimeUnit.HOURS.toMillis(6);
Review Comment:
Please add a config for this property and change its type to `Duration` so
it can be specified using ISO8601 duration strings, to make it consistent with
other duration parameters. Please also document it.
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java:
##########
@@ -102,9 +102,14 @@ public void run() throws Exception
if (nextDirName != null && !nextDirName.isEmpty()) {
if (runningTaskIds.contains(nextDirName)) {
// do nothing
- } else if (DurableStorageUtils.QUERY_RESULTS_DIR.equals(nextDirName)
- && DurableStorageUtils.isQueryResultFileActive(currentFile,
knownTaskIds)) {
- // query results should not be cleaned even if the task has finished
running
+ } else if (
+ DurableStorageUtils.QUERY_RESULTS_DIR.equals(nextDirName)
+ && DurableStorageUtils.isQueryResultFileActive(
+ currentFile,
+ taskId ->
Optional.fromNullable(taskStorage.getTaskInfo(taskId)).transform(TaskInfo::getCreatedTime),
Review Comment:
This is going to be expensive; each usage of `taskStorage` is a call to the
metadata store. It's better to batch these, such as by using one call to get
all of the task IDs that have completed within the retention period. Then you
can delete anything that doesn't appear in that list.
##########
multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java:
##########
@@ -78,24 +88,52 @@ public void testRun() throws Exception
EasyMock.expect((Collection<TaskRunnerWorkItem>)
TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
- EasyMock.expect((Collection<TaskRunnerWorkItem>)
TASK_RUNNER.getKnownTasks())
- .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
- .anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
Capture<Set<String>> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().once();
- EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM,
STORAGE_CONNECTOR);
+ EasyMock.replay(TASK_STORAGE, TASK_MASTER, TASK_RUNNER,
TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
durableStorageCleaner.run();
Assert.assertEquals(Sets.newHashSet(STRAY_DIR),
capturedArguments.getValue());
}
+ @Test
+ public void testRunClearsStaleTask() throws Exception
Review Comment:
Please also include a test for a task not found
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]