abhishekagarwal87 commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1191964565
##########
docs/ingestion/tasks.md:
##########
@@ -419,6 +419,19 @@ You can configure retention periods for logs in
milliseconds by setting `druid.i
> Automatic log file deletion typically works based on the log file's
> 'modified' timestamp in the back-end store. Large clock skews between Druid
> processes and the long-term store might result in unintended behavior.
+## Configuring task storage sizes
+
+Tasks sometimes need to use local disk for storage of things while the task is
active. For example, for realtime ingestion tasks to accept broadcast segments
for broadcast joins. Or intermediate data sets for Multi-stage Query jobs
+
+Task storage sizes are configured through a combination of three properties:
+1. `druid.worker.capacity` - i.e. the "number of task slots"
+2. `druid.worker.baseTaskDirs` - i.e. the list directories to use for task
storage
Review Comment:
```suggestion
2. `druid.worker.baseTaskDirs` - i.e. the list of directories to use for
task storage.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -23,92 +23,240 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
public class TaskStorageDirTracker
{
+ private static final Logger log = new Logger(TaskStorageDirTracker.class);
+
public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig,
TaskConfig taskConfig)
{
- if (workerConfig == null) {
- return new
TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+ final List<String> basePaths = workerConfig.getBaseTaskDirs();
+ final List<File> baseTaskDirs;
+
+ if (basePaths == null) {
+ baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
} else {
- final List<String> basePaths = workerConfig.getBaseTaskDirs();
- if (basePaths == null) {
- return new
TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
- }
- return new TaskStorageDirTracker(
- basePaths.stream().map(File::new).collect(Collectors.toList())
- );
+ baseTaskDirs =
basePaths.stream().map(File::new).collect(Collectors.toList());
+ }
+
+ return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(),
workerConfig.getBaseTaskDirSize());
+ }
+
+ public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs,
int numSlots, long dirSize)
+ {
+ int slotsPerBaseTaskDir = numSlots / baseTaskDirs.size();
+ if (slotsPerBaseTaskDir == 0) {
+ slotsPerBaseTaskDir = 1;
+ } else if (numSlots % baseTaskDirs.size() > 0) {
+ // We have to add an extra slot per location if they do not evenly divide
+ ++slotsPerBaseTaskDir;
}
+ long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+ File[] slotDirs = new File[numSlots];
+ for (int i = 0; i < numSlots; ++i) {
+ final int whichDir = i % baseTaskDirs.size();
+ final int dirUsageCount = i / baseTaskDirs.size();
+ slotDirs[i] = new File(baseTaskDirs.get(whichDir),
StringUtils.format("slot%d", dirUsageCount));
+ }
+
+ return new TaskStorageDirTracker(baseTaskDirs, slotDirs, sizePerSlot);
}
+ /**
+ * The base task dirs, this field exists primarily for compatibility with
scheduling that was done
+ * before TaskStorageDirTracker was introduced. All of the tasks were just
splatted together
+ * into one directory. If we want to be able to restore the tasks, we need
to be able to find them
+ * at the old locations and that is why this exists.
+ */
private final File[] baseTaskDirs;
- // Initialize to a negative number because it ensures that we can handle the
overflow-rollover case
+
+ /**
+ * These are slots pre-divided to keep disk sizing considerations aligned.
The normal operation of this
+ * class is to round-robin across these slots.
+ */
+ private final StorageSlot[] slots;
+
+ /**
+ * A counter used to simplify round-robin allocation. We initialize it to a
negative value because it
+ * simplifies testing/ensuring that we can handle overflow-rollover of the
integer
+ */
private final AtomicInteger iterationCounter = new
AtomicInteger(Integer.MIN_VALUE);
- public TaskStorageDirTracker(List<File> baseTaskDirs)
+ private TaskStorageDirTracker(List<File> baseTaskDirs, File[] slotDirs, long
sizePerSlot)
{
this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+ this.slots = new StorageSlot[slotDirs.length];
+ for (int i = 0; i < slotDirs.length; ++i) {
+ slots[i] = new StorageSlot(slotDirs[i], sizePerSlot);
+ }
}
@LifecycleStart
public void ensureDirectories()
{
- for (File baseTaskDir : baseTaskDirs) {
- if (!baseTaskDir.exists()) {
+ for (StorageSlot slot : slots) {
+ if (!slot.getDirectory().exists()) {
try {
- FileUtils.mkdirp(baseTaskDir);
+ FileUtils.mkdirp(slot.getDirectory());
}
catch (IOException e) {
throw new ISE(
e,
- "base task directory [%s] likely does not exist, please ensure
it exists and the user has permissions.",
- baseTaskDir
+ "directory for slot [%s] likely does not exist, please ensure it
exists and the user has permissions.",
Review Comment:
Hmm. It's not necessary for the directory to exist beforehand since we are
creating it at JVM startup. The error message should just say that it could not
create the directory and user should have the relevant permission.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -23,92 +23,240 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
public class TaskStorageDirTracker
{
+ private static final Logger log = new Logger(TaskStorageDirTracker.class);
+
public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig,
TaskConfig taskConfig)
{
- if (workerConfig == null) {
- return new
TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+ final List<String> basePaths = workerConfig.getBaseTaskDirs();
+ final List<File> baseTaskDirs;
+
+ if (basePaths == null) {
+ baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
} else {
- final List<String> basePaths = workerConfig.getBaseTaskDirs();
- if (basePaths == null) {
- return new
TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
- }
- return new TaskStorageDirTracker(
- basePaths.stream().map(File::new).collect(Collectors.toList())
- );
+ baseTaskDirs =
basePaths.stream().map(File::new).collect(Collectors.toList());
+ }
+
+ return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(),
workerConfig.getBaseTaskDirSize());
+ }
+
+ public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs,
int numSlots, long dirSize)
+ {
+ int slotsPerBaseTaskDir = numSlots / baseTaskDirs.size();
+ if (slotsPerBaseTaskDir == 0) {
+ slotsPerBaseTaskDir = 1;
+ } else if (numSlots % baseTaskDirs.size() > 0) {
+ // We have to add an extra slot per location if they do not evenly divide
+ ++slotsPerBaseTaskDir;
}
+ long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+ File[] slotDirs = new File[numSlots];
+ for (int i = 0; i < numSlots; ++i) {
+ final int whichDir = i % baseTaskDirs.size();
+ final int dirUsageCount = i / baseTaskDirs.size();
+ slotDirs[i] = new File(baseTaskDirs.get(whichDir),
StringUtils.format("slot%d", dirUsageCount));
+ }
+
+ return new TaskStorageDirTracker(baseTaskDirs, slotDirs, sizePerSlot);
}
+ /**
+ * The base task dirs, this field exists primarily for compatibility with
scheduling that was done
+ * before TaskStorageDirTracker was introduced. All of the tasks were just
splatted together
+ * into one directory. If we want to be able to restore the tasks, we need
to be able to find them
+ * at the old locations and that is why this exists.
+ */
private final File[] baseTaskDirs;
- // Initialize to a negative number because it ensures that we can handle the
overflow-rollover case
+
+ /**
+ * These are slots pre-divided to keep disk sizing considerations aligned.
The normal operation of this
+ * class is to round-robin across these slots.
+ */
+ private final StorageSlot[] slots;
+
+ /**
+ * A counter used to simplify round-robin allocation. We initialize it to a
negative value because it
+ * simplifies testing/ensuring that we can handle overflow-rollover of the
integer
+ */
private final AtomicInteger iterationCounter = new
AtomicInteger(Integer.MIN_VALUE);
- public TaskStorageDirTracker(List<File> baseTaskDirs)
+ private TaskStorageDirTracker(List<File> baseTaskDirs, File[] slotDirs, long
sizePerSlot)
{
this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+ this.slots = new StorageSlot[slotDirs.length];
+ for (int i = 0; i < slotDirs.length; ++i) {
+ slots[i] = new StorageSlot(slotDirs[i], sizePerSlot);
+ }
}
@LifecycleStart
public void ensureDirectories()
{
- for (File baseTaskDir : baseTaskDirs) {
- if (!baseTaskDir.exists()) {
+ for (StorageSlot slot : slots) {
+ if (!slot.getDirectory().exists()) {
try {
- FileUtils.mkdirp(baseTaskDir);
+ FileUtils.mkdirp(slot.getDirectory());
}
catch (IOException e) {
throw new ISE(
e,
- "base task directory [%s] likely does not exist, please ensure
it exists and the user has permissions.",
- baseTaskDir
+ "directory for slot [%s] likely does not exist, please ensure it
exists and the user has permissions.",
+ slot
);
}
}
}
}
- public File pickBaseDir(String taskId)
+ public synchronized StorageSlot pickStorageSlot(String taskId)
{
- if (baseTaskDirs.length == 1) {
- return baseTaskDirs[0];
+ // if the task directory already exists, we want to give it precedence, so
check.
+ for (StorageSlot slot : slots) {
+ if (slot.runningTaskId != null && slot.runningTaskId.equals(taskId)) {
+ return slot;
+ }
}
- // if the task directory already exists, we want to give it precedence, so
check.
- for (File baseTaskDir : baseTaskDirs) {
- if (new File(baseTaskDir, taskId).exists()) {
- return baseTaskDir;
+ // if it doesn't exist, pick one round-robin and ensure it is unused.
+ for (int i = 0; i < slots.length; ++i) {
+ // This can be negative, so abs() it.
+ final int currIncrement = Math.abs(iterationCounter.getAndIncrement() %
slots.length);
+ final StorageSlot candidateSlot = slots[currIncrement % slots.length];
+ if (candidateSlot.runningTaskId == null) {
+ candidateSlot.runningTaskId = taskId;
+ return candidateSlot;
}
}
+ throw new ISE("Unable to pick a free slot, this should never happen, slot
status [%s].", Arrays.toString(slots));
+ }
- // if it doesn't exist, pick one round-robin and return. This can be
negative, so abs() it
- final int currIncrement = Math.abs(iterationCounter.getAndIncrement() %
baseTaskDirs.length);
- return baseTaskDirs[currIncrement % baseTaskDirs.length];
+ public synchronized void returnStorageSlot(StorageSlot slot)
+ {
+ if (slot.getParentRef() == this) {
Review Comment:
interesting. can there be many TaskStorageDirTracker in one JVM?
##########
server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java:
##########
@@ -80,11 +83,35 @@ public int getCapacity()
return capacity;
}
+ public WorkerConfig setCapacity(int capacity)
+ {
+ this.capacity = capacity;
+ return this;
+ }
+
+ public long getBaseTaskDirSize()
+ {
+ return baseTaskDirSize;
+ }
+
+ public WorkerConfig setBaseTaskDirSize(long baseTaskDirSize)
+ {
+ this.baseTaskDirSize = baseTaskDirSize;
+ return this;
+ }
+
public List<String> getBaseTaskDirs()
{
return baseTaskDirs;
}
+ public WorkerConfig setBaseTaskDirs(List<String> baseTaskDirs)
Review Comment:
I haven't seen setters in config classes in the druid code before. But then
I also don't know if it's intentional.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
public class TaskStorageDirTracker
{
public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig,
TaskConfig taskConfig)
{
+ final List<File> baseTaskDirs;
if (workerConfig == null) {
- return new
TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+ baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
} else {
final List<String> basePaths = workerConfig.getBaseTaskDirs();
if (basePaths == null) {
- return new
TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+ baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+ } else {
+ baseTaskDirs =
basePaths.stream().map(File::new).collect(Collectors.toList());
}
- return new TaskStorageDirTracker(
- basePaths.stream().map(File::new).collect(Collectors.toList())
- );
}
+
+ return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(),
workerConfig.getBaseTaskDirSize());
+ }
+
+ public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs,
int numSlots, long dirSize)
+ {
+ int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+ if (numSlots % baseTaskDirs.size() > 0) {
+ // We have to add an extra slot per location if they do not evenly divide
+ ++slotsPerBaseTaskDir;
+ }
+ long sizePerSlot = dirSize / slotsPerBaseTaskDir;
Review Comment:
If we have
three directories - A, B, C
dir size - 200 GB
10 workers
we get slotsPerBaseDir - 4
sizePerSlot - 50 GB
so there will be 100 GB left unused. is that calculation correct?
I think it's ok. But maybe we can call out this nuance in the docs? Users
can always adjust the worker capacity or volume count if they want the whole
space to be used.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java:
##########
@@ -258,6 +265,8 @@ public TaskStatus call()
}
}
+ getTracker().returnStorageSlot(storageSlot);
Review Comment:
Similar comment here re sequencing.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -924,5 +870,50 @@ static int getNextAttemptID(File taskDir)
}
return maxAttempt + 1;
}
+
+ public static class CommandListBuilder
Review Comment:
Neat 👍
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -477,6 +429,8 @@ public TaskStatus call()
portFinder.markPortUnused(tlsChildPort);
}
+ getTracker().returnStorageSlot(storageSlot);
Review Comment:
shouldn't you delete the directories first before returning the slot?
Similar to how it's happening in BaseRestorableTaskRunner. In that class, if
deleting directory fails for any reason, the slot will not get unreserved.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -304,82 +290,48 @@ public TaskStatus call()
if (context != null) {
for (String propName : context.keySet()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
- command.add(
- StringUtils.format(
- "-D%s=%s",
+ command.addSystemProperty(
propName.substring(CHILD_PROPERTY_PREFIX.length()),
task.getContextValue(propName)
- )
);
}
}
}
// add the attemptId as a system property
- command.add(
- StringUtils.format(
- "-D%s=%s",
- "attemptId",
- "1"
- )
- );
+ command.addSystemProperty("attemptId", "1");
// Add dataSource, taskId and taskType for metrics or
logging
- command.add(
- StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.DATASOURCE,
+ command.addSystemProperty(
+ MonitorsConfig.METRIC_DIMENSION_PREFIX +
DruidMetrics.DATASOURCE,
task.getDataSource()
- )
);
- command.add(
- StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.TASK_ID,
+ command.addSystemProperty(
+ MonitorsConfig.METRIC_DIMENSION_PREFIX +
DruidMetrics.TASK_ID,
task.getId()
- )
);
- command.add(
- StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.TASK_TYPE,
+ command.addSystemProperty(
+ MonitorsConfig.METRIC_DIMENSION_PREFIX +
DruidMetrics.TASK_TYPE,
task.getType()
- )
);
- command.add(StringUtils.format("-Ddruid.host=%s",
childHost));
-
command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
- command.add(StringUtils.format("-Ddruid.tlsPort=%d",
tlsChildPort));
+
+ command.addSystemProperty("druid.host", childHost);
+ command.addSystemProperty("druid.plaintextPort",
childPort);
+ command.addSystemProperty("druid.tlsPort",
tlsChildPort);
// Let tasks know where they are running on.
// This information is used in native parallel
indexing with shuffle.
-
command.add(StringUtils.format("-Ddruid.task.executor.service=%s",
node.getServiceName()));
-
command.add(StringUtils.format("-Ddruid.task.executor.host=%s",
node.getHost()));
- command.add(
-
StringUtils.format("-Ddruid.task.executor.plaintextPort=%d",
node.getPlaintextPort())
- );
- command.add(
- StringUtils.format(
- "-Ddruid.task.executor.enablePlaintextPort=%s",
- node.isEnablePlaintextPort()
- )
- );
-
command.add(StringUtils.format("-Ddruid.task.executor.tlsPort=%d",
node.getTlsPort()));
- command.add(
-
StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s",
node.isEnableTlsPort())
- );
-
command.add(StringUtils.format("-Dlog4j2.configurationFactory=%s",
ConsoleLoggingEnforcementConfigurationFactory.class.getName()));
+
command.addSystemProperty("druid.task.executor.service", node.getServiceName());
+ command.addSystemProperty("druid.task.executor.host",
node.getHost());
+
command.addSystemProperty("druid.task.executor.plaintextPort",
node.getPlaintextPort());
+
command.addSystemProperty("druid.task.executor.enablePlaintextPort",
node.isEnablePlaintextPort());
+
command.addSystemProperty("druid.task.executor.tlsPort", node.getTlsPort());
+
command.addSystemProperty("druid.task.executor.enableTlsPort",
node.isEnableTlsPort());
+
command.addSystemProperty("log4j2.configurationFactory",
ConsoleLoggingEnforcementConfigurationFactory.class.getName());
- // These are not enabled per default to allow the user
to either set or not set them
- // Users are highly suggested to be set in
druid.indexer.runner.javaOpts
- // See
org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
- // for more information
- // command.add("-XX:+UseThreadPriorities");
- // command.add("-XX:ThreadPriorityPolicy=42");
Review Comment:
are these not required anymore?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]