This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
new 9058e68 Reduce default max # of subTasks to 1 for native parallel
task (#7181) (#7200)
9058e68 is described below
commit 9058e683bbd23da30ac72845bc269b55ccd823e5
Author: Jonathan Wei <[email protected]>
AuthorDate: Wed Mar 6 17:12:42 2019 -0800
Reduce default max # of subTasks to 1 for native parallel task (#7181)
(#7200)
* Reduce # of max subTasks to 2
* fix typo and add more doc
* add more doc and link
* change default and add warning
* fix doc
* add test
* fix it test
---
docs/content/ingestion/native_tasks.md | 46 +++++++--
.../druid/indexing/common/task/IndexTask.java | 11 ++-
.../parallel/ParallelIndexSupervisorTask.java | 37 ++++---
.../batch/parallel/ParallelIndexTuningConfig.java | 5 +-
.../AbstractParallelIndexSupervisorTaskTest.java | 16 ---
.../ParallelIndexSupervisorTaskResourceTest.java | 7 +-
.../parallel/ParallelIndexSupervisorTaskTest.java | 108 +++++++++++++++------
7 files changed, 159 insertions(+), 71 deletions(-)
diff --git a/docs/content/ingestion/native_tasks.md
b/docs/content/ingestion/native_tasks.md
index 837574c..4ecaccf 100644
--- a/docs/content/ingestion/native_tasks.md
+++ b/docs/content/ingestion/native_tasks.md
@@ -54,7 +54,17 @@ which specifies a split and submits worker tasks using those
specs. As a result,
the implementation of splittable firehoses. Please note that multiple tasks
can be created for the same worker task spec
if one of them fails.
-Since this task doesn't shuffle intermediate data, it isn't available for
[perfect rollup](../ingestion/index.html#roll-up-modes).
+You may want to consider the below points:
+- Since this task doesn't shuffle intermediate data, it isn't available for
[perfect rollup](../ingestion/index.html#roll-up-modes).
+- The number of tasks for parallel ingestion is decided by `maxNumSubTasks` in
the tuningConfig.
+ Since the supervisor task creates up to `maxNumSubTasks` worker tasks
regardless of the available task slots,
+ it may affect to other ingestion performance. As a result, it's important to
set `maxNumSubTasks` properly.
+ See the below [Capacity Planning](#capacity-planning) section for more
details.
+- By default, batch ingestion replaces all data in any segment that it writes
to. If you'd like to add to the segment
+ instead, set the appendToExisting flag in ioConfig. Note that it only
replaces data in segments where it actively adds
+ data: if there are segments in your granularitySpec's intervals that have no
data written by this task, they will be
+ left alone.
+
An example ingestion spec is:
@@ -122,16 +132,15 @@ An example ingestion spec is:
"baseDir": "examples/indexing/",
"filter": "wikipedia_index_data*"
}
+ },
+ "tuningconfig": {
+ "type": "index_parallel",
+ "maxNumSubTasks": 2
}
}
}
```
-By default, batch ingestion replaces all data in any segment that it writes
to. If you'd like to add to the segment
-instead, set the appendToExisting flag in ioConfig. Note that it only replaces
data in segments where it actively adds
-data: if there are segments in your granularitySpec's intervals that have no
data written by this task, they will be
-left alone.
-
#### Task Properties
|property|description|required?|
@@ -181,7 +190,7 @@ The tuningConfig is optional and default parameters will be
used if no tuningCon
|reportParseExceptions|If true, exceptions encountered during parsing will be
thrown and will halt ingestion; if false, unparseable rows and fields will be
skipped.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where
0 means to wait forever.|0|no|
|segmentWriteOutMediumFactory|Segment write-out medium to use when creating
segments. See
[SegmentWriteOutMediumFactory](#segmentWriteOutMediumFactory).|Not specified,
the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no|
-|maxNumSubTasks|Maximum number of tasks which can be run at the same
time.|Integer.MAX_VALUE|no|
+|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The
supervisor task would spawn worker tasks up to `maxNumSubTasks` regardless of
the available task slots. If this value is set to 1, the supervisor task
processes data ingestion on its own instead of spawning worker tasks. If this
value is set to too large, too many worker tasks can be created which might
block other ingestion. Check [Capacity Planning](#capacity-planning) for more
details.|1|no|
|maxRetry|Maximum number of retries on task failures.|3|no|
|taskStatusCheckPeriodMs|Polling period in milleseconds to check running task
statuses.|1000|no|
|chatHandlerTimeout|Timeout for reporting the pushed segments in worker
tasks.|PT10S|no|
@@ -372,7 +381,7 @@ An example of the result is
"reportParseExceptions": false,
"pushTimeout": 0,
"segmentWriteOutMediumFactory": null,
- "maxNumSubTasks": 2147483647,
+ "maxNumSubTasks": 4,
"maxRetry": 3,
"taskStatusCheckPeriodMs": 1000,
"chatHandlerTimeout": "PT10S",
@@ -408,6 +417,27 @@ An example of the result is
Returns the task attempt history of the worker task spec of the given id, or
HTTP 404 Not Found error if the supervisor task is running in the sequential
mode.
+### Capacity Planning
+
+The supervisor task can create up to `maxNumSubTasks` worker tasks no matter
how many task slots are currently available.
+As a result, total number of tasks which can be run at the same time is
`(maxNumSubTasks + 1)` (including the supervisor task).
+Please note that this can be even larger than total number of task slots (sum
of the capacity of all workers).
+If `maxNumSubTasks` is larger than `n (available task slots)`, then
+`maxNumSubTasks` tasks are created by the supervisor task, but only `n` tasks
would be started.
+Others will wait in the pending state until any running task is finished.
+
+If you are using the Parallel Index Task with stream ingestion together,
+we would recommend to limit the max capacity for batch ingestion to prevent
+stream ingestion from being blocked by batch ingestion. Suppose you have
+`t` Parallel Index Tasks to run at the same time, but want to limit
+the max number of tasks for batch ingestion to `b`. Then, (sum of
`maxNumSubTasks`
+of all Parallel Index Tasks + `t` (for supervisor tasks)) must be smaller than
`b`.
+
+If you have some tasks of a higher priority than others, you may set their
+`maxNumSubTasks` to a higher value than lower priority tasks.
+This may help the higher priority tasks to finish earlier than lower priority
tasks
+by assigning more task slots to them.
+
Local Index Task
----------------
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 6d5a0d8..2f62f0b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -407,7 +407,16 @@ public class IndexTask extends AbstractTask implements
ChatHandler
try {
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]",
chatHandlerProvider.get().getClass().getName());
- chatHandlerProvider.get().register(getId(), this, false);
+
+ if (chatHandlerProvider.get().get(getId()).isPresent()) {
+ // This is a workaround for ParallelIndexSupervisorTask to avoid
double registering when it runs in the
+ // sequential mode. See ParallelIndexSupervisorTask.runSequential().
+ // Note that all HTTP endpoints are not available in this case. This
works only for
+ // ParallelIndexSupervisorTask because it doesn't support APIs for
live ingestion reports.
+ log.warn("Chat handler is already registered. Skipping chat handler
registration.");
+ } else {
+ chatHandlerProvider.get().register(getId(), this, false);
+ }
} else {
log.warn("No chat handler detected");
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 61f4517..f8eebd4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -247,13 +247,23 @@ public class ParallelIndexSupervisorTask extends
AbstractTask implements ChatHan
chatHandlerProvider.register(getId(), this, false);
try {
- if (baseFirehoseFactory.isSplittable()) {
+ if (isParallelMode()) {
return runParallel(toolbox);
} else {
- log.warn(
- "firehoseFactory[%s] is not splittable. Running sequentially",
- baseFirehoseFactory.getClass().getSimpleName()
- );
+ if (!baseFirehoseFactory.isSplittable()) {
+ log.warn(
+ "firehoseFactory[%s] is not splittable. Running sequentially.",
+ baseFirehoseFactory.getClass().getSimpleName()
+ );
+ } else if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1)
{
+ log.warn(
+ "maxNumSubTasks is 1. Running sequentially. "
+ + "Please set maxNumSubTasks to something higher than 1 if you
want to run in parallel ingestion mode."
+ );
+ } else {
+ throw new ISE("Unknown reason for sequentail mode. Failing this
task.");
+ }
+
return runSequential(toolbox);
}
}
@@ -262,6 +272,15 @@ public class ParallelIndexSupervisorTask extends
AbstractTask implements ChatHan
}
}
+ private boolean isParallelMode()
+ {
+ if (baseFirehoseFactory.isSplittable() &&
ingestionSchema.getTuningConfig().getMaxNumSubTasks() > 1) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
@VisibleForTesting
void setToolbox(TaskToolbox toolbox)
{
@@ -271,7 +290,7 @@ public class ParallelIndexSupervisorTask extends
AbstractTask implements ChatHan
private TaskStatus runParallel(TaskToolbox toolbox) throws Exception
{
createRunner(toolbox);
- return TaskStatus.fromCode(getId(), runner.run());
+ return TaskStatus.fromCode(getId(), Preconditions.checkNotNull(runner,
"runner").run());
}
private TaskStatus runSequential(TaskToolbox toolbox)
@@ -470,11 +489,7 @@ public class ParallelIndexSupervisorTask extends
AbstractTask implements ChatHan
public Response getMode(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- if (runner == null) {
- return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task
is not running yet").build();
- } else {
- return Response.ok(baseFirehoseFactory.isSplittable() ? "parallel" :
"sequential").build();
- }
+ return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
}
@GET
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 85929db..c0e9370 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -34,7 +35,7 @@ import java.util.Objects;
@JsonTypeName("index_parallel")
public class ParallelIndexTuningConfig extends IndexTuningConfig
{
- private static final int DEFAULT_MAX_NUM_BATCH_TASKS = Integer.MAX_VALUE; //
unlimited
+ private static final int DEFAULT_MAX_NUM_BATCH_TASKS = 1;
private static final int DEFAULT_MAX_RETRY = 3;
private static final long DEFAULT_TASK_STATUS_CHECK_PERIOD_MS = 1000;
@@ -131,6 +132,8 @@ public class ParallelIndexTuningConfig extends
IndexTuningConfig
this.chatHandlerTimeout = DEFAULT_CHAT_HANDLER_TIMEOUT;
this.chatHandlerNumRetries = DEFAULT_CHAT_HANDLER_NUM_RETRIES;
+
+ Preconditions.checkArgument(this.maxNumSubTasks > 0, "maxNumSubTasks must
be positive");
}
@JsonProperty
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 5d42919..907903a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -295,22 +295,6 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
new DropwizardRowIngestionMetersFactory()
);
}
-
- @Override
- public TaskStatus run(TaskToolbox toolbox) throws Exception
- {
- return TaskStatus.fromCode(
- getId(),
- new TestParallelIndexTaskRunner(
- toolbox,
- getId(),
- getGroupId(),
- getIngestionSchema(),
- getContext(),
- new NoopIndexingServiceClient()
- ).run()
- );
- }
}
static class TestParallelIndexTaskRunner extends
SinglePhaseParallelIndexTaskRunner
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 668c8ec..04aa5a7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -500,7 +500,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
}
@Override
- public TaskStatus run(TaskToolbox toolbox) throws Exception
+ ParallelIndexTaskRunner createRunner(TaskToolbox toolbox)
{
setRunner(
new TestRunner(
@@ -509,10 +509,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
indexingServiceClient
)
);
- return TaskStatus.fromCode(
- getId(),
- getRunner().run()
- );
+ return getRunner();
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 241e9f5..efc1fc4 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -24,7 +24,6 @@ import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -229,33 +228,55 @@ public class ParallelIndexSupervisorTaskTest extends
AbstractParallelIndexSuperv
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
}
+ @Test
+ public void testWith1MaxNumSubTasks() throws Exception
+ {
+ final ParallelIndexSupervisorTask task = newTask(
+ Intervals.of("2017/2018"),
+ new ParallelIndexIOConfig(
+ new LocalFirehoseFactory(inputDir, "test_*", null),
+ false
+ ),
+ new ParallelIndexTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 1,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ );
+ actionClient = createActionClient(task);
+ toolbox = createTaskToolbox(task);
+
+ prepareTaskForLocking(task);
+ Assert.assertTrue(task.isReady(actionClient));
+ Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
+ Assert.assertNull("Runner must be null if the task was in the sequential
mode", task.getRunner());
+ }
+
private ParallelIndexSupervisorTask newTask(
Interval interval,
ParallelIndexIOConfig ioConfig
)
{
- // set up ingestion spec
- final ParallelIndexIngestionSpec ingestionSpec = new
ParallelIndexIngestionSpec(
- new DataSchema(
- "dataSource",
- getObjectMapper().convertValue(
- new StringInputRowParser(
- DEFAULT_PARSE_SPEC,
- null
- ),
- Map.class
- ),
- new AggregatorFactory[]{
- new LongSumAggregatorFactory("val", "val")
- },
- new UniformGranularitySpec(
- Granularities.DAY,
- Granularities.MINUTE,
- interval == null ? null : Collections.singletonList(interval)
- ),
- null,
- getObjectMapper()
- ),
+ return newTask(
+ interval,
ioConfig,
new ParallelIndexTuningConfig(
null,
@@ -281,6 +302,39 @@ public class ParallelIndexSupervisorTaskTest extends
AbstractParallelIndexSuperv
null
)
);
+ }
+
+ private ParallelIndexSupervisorTask newTask(
+ Interval interval,
+ ParallelIndexIOConfig ioConfig,
+ ParallelIndexTuningConfig tuningConfig
+ )
+ {
+ // set up ingestion spec
+ final ParallelIndexIngestionSpec ingestionSpec = new
ParallelIndexIngestionSpec(
+ new DataSchema(
+ "dataSource",
+ getObjectMapper().convertValue(
+ new StringInputRowParser(
+ DEFAULT_PARSE_SPEC,
+ null
+ ),
+ Map.class
+ ),
+ new AggregatorFactory[]{
+ new LongSumAggregatorFactory("val", "val")
+ },
+ new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.MINUTE,
+ interval == null ? null : Collections.singletonList(interval)
+ ),
+ null,
+ getObjectMapper()
+ ),
+ ioConfig,
+ tuningConfig
+ );
// set up test tools
return new TestSupervisorTask(
@@ -315,9 +369,8 @@ public class ParallelIndexSupervisorTaskTest extends
AbstractParallelIndexSuperv
}
@Override
- public TaskStatus run(TaskToolbox toolbox) throws Exception
+ ParallelIndexTaskRunner createRunner(TaskToolbox toolbox)
{
- setToolbox(toolbox);
setRunner(
new TestRunner(
toolbox,
@@ -325,10 +378,7 @@ public class ParallelIndexSupervisorTaskTest extends
AbstractParallelIndexSuperv
indexingServiceClient
)
);
- return TaskStatus.fromCode(
- getId(),
- getRunner().run()
- );
+ return getRunner();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]