imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1192097495
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -23,92 +23,240 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
public class TaskStorageDirTracker
{
+ private static final Logger log = new Logger(TaskStorageDirTracker.class);
+
public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig,
TaskConfig taskConfig)
{
- if (workerConfig == null) {
- return new
TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+ final List<String> basePaths = workerConfig.getBaseTaskDirs();
+ final List<File> baseTaskDirs;
+
+ if (basePaths == null) {
+ baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
} else {
- final List<String> basePaths = workerConfig.getBaseTaskDirs();
- if (basePaths == null) {
- return new
TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
- }
- return new TaskStorageDirTracker(
- basePaths.stream().map(File::new).collect(Collectors.toList())
- );
+ baseTaskDirs =
basePaths.stream().map(File::new).collect(Collectors.toList());
+ }
+
+ return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(),
workerConfig.getBaseTaskDirSize());
+ }
+
+ public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs,
int numSlots, long dirSize)
+ {
+ int slotsPerBaseTaskDir = numSlots / baseTaskDirs.size();
+ if (slotsPerBaseTaskDir == 0) {
+ slotsPerBaseTaskDir = 1;
+ } else if (numSlots % baseTaskDirs.size() > 0) {
+ // We have to add an extra slot per location if they do not evenly divide
+ ++slotsPerBaseTaskDir;
}
+ long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+ File[] slotDirs = new File[numSlots];
+ for (int i = 0; i < numSlots; ++i) {
+ final int whichDir = i % baseTaskDirs.size();
+ final int dirUsageCount = i / baseTaskDirs.size();
+ slotDirs[i] = new File(baseTaskDirs.get(whichDir),
StringUtils.format("slot%d", dirUsageCount));
+ }
+
+ return new TaskStorageDirTracker(baseTaskDirs, slotDirs, sizePerSlot);
}
+ /**
+ * The base task dirs, this field exists primarily for compatibility with
scheduling that was done
+ * before TaskStorageDirTracker was introduced. All of the tasks were just
splatted together
+ * into one directory. If we want to be able to restore the tasks, we need
to be able to find them
+ * at the old locations and that is why this exists.
+ */
private final File[] baseTaskDirs;
- // Initialize to a negative number because it ensures that we can handle the
overflow-rollover case
+
+ /**
+ * These are slots pre-divided to keep disk sizing considerations aligned.
The normal operation of this
+ * class is to round-robin across these slots.
+ */
+ private final StorageSlot[] slots;
+
+ /**
+ * A counter used to simplify round-robin allocation. We initialize it to a
negative value because it
+ * simplifies testing/ensuring that we can handle overflow-rollover of the
integer
+ */
private final AtomicInteger iterationCounter = new
AtomicInteger(Integer.MIN_VALUE);
- public TaskStorageDirTracker(List<File> baseTaskDirs)
+ private TaskStorageDirTracker(List<File> baseTaskDirs, File[] slotDirs, long
sizePerSlot)
{
this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+ this.slots = new StorageSlot[slotDirs.length];
+ for (int i = 0; i < slotDirs.length; ++i) {
+ slots[i] = new StorageSlot(slotDirs[i], sizePerSlot);
+ }
}
@LifecycleStart
public void ensureDirectories()
{
- for (File baseTaskDir : baseTaskDirs) {
- if (!baseTaskDir.exists()) {
+ for (StorageSlot slot : slots) {
+ if (!slot.getDirectory().exists()) {
try {
- FileUtils.mkdirp(baseTaskDir);
+ FileUtils.mkdirp(slot.getDirectory());
}
catch (IOException e) {
throw new ISE(
e,
- "base task directory [%s] likely does not exist, please ensure
it exists and the user has permissions.",
- baseTaskDir
+ "directory for slot [%s] likely does not exist, please ensure it
exists and the user has permissions.",
Review Comment:
So, if it already existed, the mkdirp would have succeeded regardless of
permissions. The reason for "likely does not exist" is to tell whoever sees
this that they shouldn't actually expect it to exist, but they should make it
exist.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]