This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 0.15.1-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit b88dbf15c64c662563f15b0359eddaf68ccec2e9 Author: Himanshu <[email protected]> AuthorDate: Tue Jun 18 09:18:43 2019 -0700 WorkerTaskManager to create disk files atomically and ignore task file corruption (#7917) * WorkerTaskManager to create disk files atomically and ignore task file corruptions * fixing weird checkstyle lambda indentation issues --- .../apache/druid/java/util/common/FileUtils.java | 16 +++++--- .../druid/indexing/worker/WorkerTaskManager.java | 46 +++++++++++++++++++--- 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java index 7be41a8..17f0ed0 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java @@ -176,6 +176,15 @@ public class FileUtils /** * Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to + * the target location. More docs at {@link FileUtils#writeAtomically(File, File, OutputStreamConsumer)} . + */ + public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException + { + return writeAtomically(file, file.getParentFile(), f); + } + + /** + * Write to a file atomically, by first writing to a temporary file in given tmpDir directory and then moving it to * the target location. This function attempts to clean up its temporary files when possible, but they may stick * around (for example, if the JVM crashes partway through executing the function). In any case, the target file * should be unharmed. @@ -186,12 +195,7 @@ public class FileUtils * * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine. */ - public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException - { - return writeAtomically(file, file.getParentFile(), f); - } - - private static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException + public static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException { final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID())); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 613c69b..ac5c15c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; @@ -129,6 +130,7 @@ public abstract class WorkerTaskManager synchronized (lock) { try { log.info("Starting..."); + cleanupAndMakeTmpTaskDir(); registerLocationListener(); restoreRestorableTasks(); initAssignedTasks(); @@ -264,7 +266,12 @@ public abstract class WorkerTaskManager } try { - jsonMapper.writeValue(new File(getAssignedTaskDir(), task.getId()), task); + FileUtils.writeAtomically(new File(getAssignedTaskDir(), task.getId()), getTmpTaskDir(), + os -> { + jsonMapper.writeValue(os, task); + return null; + } + ); assignedTasks.put(task.getId(), task); } catch (IOException ex) { @@ -286,6 +293,28 @@ public abstract class WorkerTaskManager submitNoticeToExec(new RunNotice(task)); } + private File getTmpTaskDir() + { + return new File(taskConfig.getBaseTaskDir(), "workerTaskManagerTmp"); + } + + private void cleanupAndMakeTmpTaskDir() + { + File tmpDir = getTmpTaskDir(); + tmpDir.mkdirs(); + if (!tmpDir.isDirectory()) { + throw new ISE("Tmp Tasks Dir [%s] does not exist/not-a-directory.", tmpDir); + } + + // Delete any tmp files left out from before due to jvm crash. + try { + org.apache.commons.io.FileUtils.cleanDirectory(tmpDir); + } + catch (IOException ex) { + log.warn("Failed to cleanup tmp dir [%s].", tmpDir.getAbsolutePath()); + } + } + public File getAssignedTaskDir() { return new File(taskConfig.getBaseTaskDir(), "assignedTasks"); @@ -311,11 +340,11 @@ public abstract class WorkerTaskManager assignedTasks.put(taskId, task); log.info("Found assigned task[%s].", taskId); } else { - throw new ISE("Corrupted assigned task on disk[%s].", taskFile.getAbsoluteFile()); + throw new ISE("WTF! Corrupted assigned task on disk[%s].", taskFile.getAbsoluteFile()); } } catch (IOException ex) { - throw new ISE(ex, "Failed to read assigned task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile()); + log.error(ex, "Failed to read assigned task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile()); } } @@ -395,7 +424,12 @@ public abstract class WorkerTaskManager completedTasks.put(taskId, taskAnnouncement); try { - jsonMapper.writeValue(new File(getCompletedTaskDir(), taskId), taskAnnouncement); + FileUtils.writeAtomically(new File(getCompletedTaskDir(), taskId), getTmpTaskDir(), + os -> { + jsonMapper.writeValue(os, taskAnnouncement); + return null; + } + ); } catch (IOException ex) { log.error(ex, "Error while trying to persist completed task[%s] announcement.", taskId); @@ -423,11 +457,11 @@ public abstract class WorkerTaskManager completedTasks.put(taskId, taskAnnouncement); log.info("Found completed task[%s] with status[%s].", taskId, taskAnnouncement.getStatus()); } else { - throw new ISE("Corrupted completed task on disk[%s].", taskFile.getAbsoluteFile()); + throw new ISE("WTF! Corrupted completed task on disk[%s].", taskFile.getAbsoluteFile()); } } catch (IOException ex) { - throw new ISE(ex, "Failed to read completed task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile()); + log.error(ex, "Failed to read completed task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile()); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
