This is an automated email from the ASF dual-hosted git repository.
brile pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a43ffbdf2bc [Docs] Improvements to JSON-based batch Ingestion page
(#15286)
a43ffbdf2bc is described below
commit a43ffbdf2bce41ca1d44b991cf02449f2e2c865d
Author: Katya Macedo <[email protected]>
AuthorDate: Tue Oct 31 16:50:45 2023 -0500
[Docs] Improvements to JSON-based batch Ingestion page (#15286)
---
docs/ingestion/native-batch.md | 289 +++++++++++++++++++++++------------------
1 file changed, 159 insertions(+), 130 deletions(-)
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 8af3f6c1ca9..bbdbb171a5c 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -27,7 +27,7 @@ sidebar_label: JSON-based batch
This page describes JSON-based batch ingestion using [ingestion
specs](ingestion-spec.md). For SQL-based batch ingestion using the
[`druid-multi-stage-query`](../multi-stage-query/index.md) extension, see
[SQL-based ingestion](../multi-stage-query/index.md). Refer to the [ingestion
methods](../ingestion/index.md#batch) table to determine which ingestion method
is right for you.
:::
-Apache Druid supports the following types of native batch indexing tasks:
+Apache Druid supports the following types of JSON-based batch indexing tasks:
- Parallel task indexing (`index_parallel`) that can run multiple indexing
tasks concurrently. Parallel task works well for production ingestion tasks.
- Simple task indexing (`index`) that run a single indexing task at a time.
Simple task indexing is suitable for development and test environments.
@@ -35,72 +35,92 @@ This topic covers the configuration for `index_parallel`
ingestion specs.
For related information on batch indexing, see:
- [Batch ingestion method comparison table](./index.md#batch) for a comparison
of batch ingestion methods.
-- [Tutorial: Loading a file](../tutorials/tutorial-batch.md) for a tutorial on
native batch ingestion.
+- [Tutorial: Loading a file](../tutorials/tutorial-batch.md) for a tutorial on
JSON-based batch ingestion.
- [Input sources](./input-sources.md) for possible input sources.
- [Source input formats](./data-formats.md#input-format) for possible input
formats.
## Submit an indexing task
-To run either kind of native batch indexing task you can:
+To run either kind of JSON-based batch indexing task, you can:
+
- Use the **Load Data** UI in the web console to define and submit an
ingestion spec.
-- Define an ingestion spec in JSON based upon the
[examples](#parallel-indexing-example) and reference topics for batch indexing.
Then POST the ingestion spec to the [Tasks API
endpoint](../api-reference/tasks-api.md),
-`/druid/indexer/v1/task`, the Overlord service. Alternatively you can use the
indexing script included with Druid at `bin/post-index-task`.
+- Define an ingestion spec in JSON based upon the
[examples](#parallel-indexing-example) and reference topics for batch indexing.
Then POST the ingestion spec to the [Tasks API
endpoint](../api-reference/tasks-api.md), `/druid/indexer/v1/task`, the
Overlord service. Alternatively, you can use the indexing script included with
Druid at `bin/post-index-task`.
## Parallel task indexing
-The parallel task type `index_parallel` is a task for multi-threaded batch
indexing. Parallel task indexing only relies on Druid resources. It does not
depend on other external systems like Hadoop.
+The parallel task type `index_parallel` is a task for multi-threaded batch
indexing. Parallel task indexing only relies on Druid resources. It doesn't
depend on other external systems like Hadoop.
The `index_parallel` task is a supervisor task that orchestrates
the whole indexing process. The supervisor task splits the input data and
creates worker tasks to process the individual portions of data.
-Druid issues the worker tasks to the Overlord. The overlord schedules and runs
the workers on MiddleManagers or Indexers. After a worker task successfully
processes the assigned input portion, it reports the resulting segment list to
the supervisor task.
+Druid issues the worker tasks to the Overlord. The Overlord schedules and runs
the workers on MiddleManagers or Indexers. After a worker task successfully
processes the assigned input portion, it reports the resulting segment list to
the Supervisor task.
-The supervisor task periodically checks the status of worker tasks. If a task
fails, the supervisor retries the task until the number of retries reaches the
configured limit. If all worker tasks succeed, it publishes the reported
segments at once and finalizes ingestion.
+The Supervisor task periodically checks the status of worker tasks. If a task
fails, the Supervisor retries the task until the number of retries reaches the
configured limit. If all worker tasks succeed, it publishes the reported
segments at once and finalizes ingestion.
The detailed behavior of the parallel task is different depending on the
`partitionsSpec`. See [`partitionsSpec`](#partitionsspec) for more details.
-Parallel tasks require:
-- a splittable [`inputSource`](#splittable-input-sources) in the `ioConfig`.
For a list of supported splittable input formats, see [Splittable input
sources](#splittable-input-sources).
-- the `maxNumConcurrentSubTasks` greater than 1 in the `tuningConfig`.
Otherwise tasks run sequentially. The `index_parallel` task reads each input
file one by one and creates segments by itself.
+Parallel tasks require the following:
+
+- A splittable [`inputSource`](#splittable-input-sources) in the `ioConfig`.
For a list of supported splittable input formats, see [Splittable input
sources](#splittable-input-sources).
+- The `maxNumConcurrentSubTasks` greater than 1 in the `tuningConfig`.
Otherwise tasks run sequentially. The `index_parallel` task reads each input
file one by one and creates segments by itself.
### Supported compression formats
-Native batch ingestion supports the following compression formats:
+
+JSON-based batch ingestion supports the following compression formats:
+
- `bz2`
- `gz`
- `xz`
- `zip`
- `sz` (Snappy)
-- `zst` (ZSTD).
+- `zst` (ZSTD)
### Implementation considerations
+
This section covers implementation details to consider when you implement
parallel task ingestion.
+
#### Volume control for worker tasks
+
You can control the amount of input data each worker task processes using
different configurations depending on the phase in parallel ingestion.
-- See [`partitionsSpec`](#partitionsspec) for details about how partitioning
affects data volume for tasks.
-- For the tasks that read data from the `inputSource`, you can set the [Split
hint spec](#split-hint-spec) in the `tuningConfig`.
-- For the task that merge shuffled segments, you can set the
`totalNumMergeTasks` in the `tuningConfig`.
+See [`partitionsSpec`](#partitionsspec) for details about how partitioning
affects data volume for tasks.
+
+For the tasks that read data from the `inputSource`, you can set the [Split
hint spec](#split-hint-spec) in the `tuningConfig`.
+For the task that merge shuffled segments, you can set the
`totalNumMergeTasks` in the `tuningConfig`.
+
#### Number of running tasks
-The `maxNumConcurrentSubTasks` in the `tuningConfig` determines the number of
concurrent worker tasks that run in parallel. The supervisor task checks the
number of current running worker tasks and creates more if it's smaller than
`maxNumConcurrentSubTasks` regardless of the number of available task slots.
This may affect to other ingestion performance. See [Capacity
planning](#capacity-planning) section for more details.
+
+The `maxNumConcurrentSubTasks` in the `tuningConfig` determines the number of
concurrent worker tasks that run in parallel. The Supervisor task checks the
number of current running worker tasks and creates more if it's smaller than
`maxNumConcurrentSubTasks` regardless of the number of available task slots.
This may affect to other ingestion performance. See [Capacity
planning](#capacity-planning) section for more details.
+
#### Replacing or appending data
-By default, batch ingestion replaces all data in the intervals in your
`granularitySpec` for any segment that it writes to. If you want to add to the
segment instead, set the `appendToExisting` flag in the `ioConfig`. Batch
ingestion only replaces data in segments where it actively adds data. If there
are segments in the intervals for your `granularitySpec` that have do not have
data from a task, they remain unchanged. If any existing segments partially
overlap with the intervals in the [...]
+
+By default, JSON-based batch ingestion replaces all data in the intervals in
your `granularitySpec` for any segment that it writes to. If you want to add to
the segment instead, set the `appendToExisting` flag in the `ioConfig`.
JSON-based batch ingestion only replaces data in segments where it actively
adds data. If there are segments in the intervals for your `granularitySpec`
that don't have data from a task, they remain unchanged. If any existing
segments partially overlap with the i [...]
#### Fully replacing existing segments using tombstones
-You can set `dropExisting` flag in the `ioConfig` to true if you want the
ingestion task to replace all existing segments that start and end within the
intervals for your `granularitySpec`. This applies whether or not the new data
covers all existing segments. `dropExisting` only applies when
`appendToExisting` is false and the `granularitySpec` contains an `interval`.
WARNING: this functionality is still in beta.
+
+:::info
+This feature is still experimental.
+:::
+
+You can set `dropExisting` flag in the `ioConfig` to true if you want the
ingestion task to replace all existing segments that start and end within the
intervals for your `granularitySpec`. This applies whether or not the new data
covers all existing segments. `dropExisting` only applies when
`appendToExisting` is false and the `granularitySpec` contains an `interval`.
The following examples demonstrate when to set the `dropExisting` property to
true in the `ioConfig`:
-Consider an existing segment with an interval of 2020-01-01 to 2021-01-01 and
`YEAR` `segmentGranularity`. You want to overwrite the whole interval of
2020-01-01 to 2021-01-01 with new data using the finer segmentGranularity of
`MONTH`. If the replacement data does not have a record within every months
from 2020-01-01 to 2021-01-01 Druid cannot drop the original `YEAR` segment
even if it does include all the replacement data. Set `dropExisting` to true in
this case to replace the origina [...]
-Imagine you want to re-ingest or overwrite a datasource and the new data does
not contain some time intervals that exist in the datasource. For example, a
datasource contains the following data at `MONTH` segmentGranularity:
+Consider an existing segment with an interval of 2020-01-01 to 2021-01-01 and
`YEAR` `segmentGranularity`. You want to overwrite the whole interval of
2020-01-01 to 2021-01-01 with new data using the finer segmentGranularity of
`MONTH`. If the replacement data does not have a record within every months
from 2020-01-01 to 2021-01-01 Druid cannot drop the original `YEAR` segment
even if it does include all the replacement data. Set `dropExisting` to true in
this case to replace the origina [...]
+
+Imagine you want to re-ingest or overwrite a datasource and the new data does
not contain some time intervals that exist in the datasource. For example, a
datasource contains the following data at `MONTH` `segmentGranularity`:
+
- **January**: 1 record
- **February**: 10 records
- **March**: 10 records
-You want to re-ingest and overwrite with new data as follows:
+You want to re-ingest and overwrite with new data as follows:
+
- **January**: 0 records
- **February**: 10 records
- **March**: 9 records
-Unless you set `dropExisting` to true, the result after ingestion with
overwrite using the same MONTH segmentGranularity would be:
+Unless you set `dropExisting` to true, the result after ingestion with
overwrite using the same `MONTH` `segmentGranularity` would be:
+
* **January**: 1 record
* **February**: 10 records
* **March**: 9 records
@@ -109,7 +129,10 @@ This may not be what it is expected since the new data has
0 records for January
## Parallel indexing example
-The following example illustrates the configuration for a parallel indexing
task:
+The following example illustrates the configuration for a parallel indexing
task.
+
+<details>
+<summary>Click to view the example</summary>
```json
{
@@ -188,65 +211,70 @@ The following example illustrates the configuration for a
parallel indexing task
}
}
```
+</details>
## Parallel indexing configuration
The following table defines the primary sections of the input spec:
-|property|description|required?|
-|--------|-----------|---------|
-|type|The task type. For parallel task indexing, set the value to
`index_parallel`.|yes|
-|id|The task ID. If omitted, Druid generates the task ID using the task type,
data source name, interval, and date-time stamp. |no|
-|spec|The ingestion spec that defines the [data schema](#dataschema), [IO
config](#ioconfig), and [tuning config](#tuningconfig).|yes|
-|context|Context to specify various task configuration parameters. See [Task
context parameters](../ingestion/tasks.md#context-parameters) for more
details.|no|
+
+|Property|Description|Required|
+|--------|-----------|--------|
+|`type`|The task type. For parallel task indexing, set the value to
`index_parallel`.|yes|
+|`id`|The task ID. If omitted, Druid generates the task ID using the task
type, data source name, interval, and date-time stamp.|no|
+|`spec`|The ingestion spec that defines the [data schema](#dataschema), [IO
config](#ioconfig), and [tuning config](#tuningconfig).|yes|
+|`context`|Context to specify various task configuration parameters. See [Task
context parameters](../ingestion/tasks.md#context-parameters) for more
details.|no|
### `dataSchema`
-This field is required. In general, it defines the way that Druid will store
your data: the primary timestamp column, the dimensions, metrics, and any
transformations. For an overview, see [Ingestion Spec
DataSchema](../ingestion/ingestion-spec.md#dataschema).
+This field is required. In general, it defines the way that Druid stores your
data: the primary timestamp column, the dimensions, metrics, and any
transformations. For an overview, see [Ingestion Spec
DataSchema](../ingestion/ingestion-spec.md#dataschema).
When defining the `granularitySpec` for index parallel, consider the defining
`intervals` explicitly if you know the time range of the data. This way locking
failure happens faster and Druid won't accidentally replace data outside the
interval range some rows contain unexpected timestamps. The reasoning is as
follows:
-- If you explicitly define `intervals`, batch ingestion locks all intervals
specified when it starts up. Problems with locking become evident quickly when
multiple ingestion or indexing tasks try to obtain a lock on the same interval.
For example, if a Kafka ingestion task tries to obtain a lock on a locked
interval causing the ingestion task fail. Furthermore, if there are rows
outside the specified intervals, Druid drops them, avoiding conflict with
unexpected intervals.
-- If you do not define `intervals`, batch ingestion locks each interval when
the interval is discovered. In this case if the task overlaps with a
higher-priority task, issues with conflicting locks occur later in the
ingestion process. Also if the source data includes rows with unexpected
timestamps, they may caused unexpected locking of intervals.
-
+- If you explicitly define `intervals`, JSON-based batch ingestion locks all
intervals specified when it starts up. Problems with locking become evident
quickly when multiple ingestion or indexing tasks try to obtain a lock on the
same interval. For example, if a Kafka ingestion task tries to obtain a lock on
a locked interval causing the ingestion task fail. Furthermore, if there are
rows outside the specified intervals, Druid drops them, avoiding conflict with
unexpected intervals.
+- If you don't define `intervals`, JSON-based batch ingestion locks each
interval when the interval is discovered. In this case, if the task overlaps
with a higher-priority task, issues with conflicting locks occur later in the
ingestion process. If the source data includes rows with unexpected timestamps,
they may caused unexpected locking of intervals.
### `ioConfig`
-|property|description|default|required?|
+The following table lists the properties of a `ioConfig` object:
+
+|Property|Description|Default|Required|
|--------|-----------|-------|---------|
-|type|The task type. Set to the value to `index_parallel`.|none|yes|
-|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to
parse input data.|none|yes|
-|appendToExisting|Creates segments as additional shards of the latest version,
effectively appending to the segment set instead of replacing it. This means
that you can append new segments to any datasource regardless of its original
partitioning scheme. You must use the `dynamic` partitioning type for the
appended segments. If you specify a different partitioning type, the task fails
with an error.|false|no|
-|dropExisting|If `true` and `appendToExisting` is `false` and the
`granularitySpec` contains an`interval`, then the ingestion task replaces all
existing segments fully contained by the specified `interval` when the task
publishes new segments. If ingestion fails, Druid does not change any existing
segment. In the case of misconfiguration where either `appendToExisting` is
`true` or `interval` is not specified in `granularitySpec`, Druid does not
replace any segments even if `dropExisting [...]
+|`type`|The task type. Set to the value to `index_parallel`.|none|yes|
+|`inputFormat`|[`inputFormat`](./data-formats.md#input-format) to specify how
to parse input data.|none|yes|
+|`appendToExisting`|Creates segments as additional shards of the latest
version, effectively appending to the segment set instead of replacing it. This
means that you can append new segments to any datasource regardless of its
original partitioning scheme. You must use the `dynamic` partitioning type for
the appended segments. If you specify a different partitioning type, the task
fails with an error.|false|no|
+|`dropExisting`|If `true` and `appendToExisting` is `false` and the
`granularitySpec` contains an`interval`, then the ingestion task replaces all
existing segments fully contained by the specified `interval` when the task
publishes new segments. If ingestion fails, Druid doesn't change any existing
segments. In the case of misconfiguration where either `appendToExisting` is
`true` or `interval` isn't specified in `granularitySpec`, Druid doesn't
replace any segments even if `dropExisting [...]
### `tuningConfig`
-The tuningConfig is optional and default parameters will be used if no
tuningConfig is specified. See below for more details.
+The `tuningConfig` is optional. Druid uses default parameters if
`tuningConfig` is not specified.
+
+The following table lists the properties of a `tuningConfig` object:
-|property|description|default|required?|
+|Property|Description|Default|Required|
|--------|-----------|-------|---------|
-|type|The task type. Set the value to`index_parallel`.|none|yes|
-|maxRowsInMemory|Determines when Druid should perform intermediate persists to
disk. Normally you do not need to set this. Depending on the nature of your
data, if rows are short in terms of bytes. For example, you may not want to
store a million rows in memory. In this case, set this value.|1000000|no|
-|maxBytesInMemory|Use to determine when Druid should perform intermediate
persists to disk. Normally Druid computes this internally and you do not need
to set it. This value represents number of bytes to aggregate in heap memory
before persisting. This is based on a rough estimate of memory usage and not
actual usage. The maximum heap memory usage for indexing is maxBytesInMemory *
(2 + maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage
of artifacts created from i [...]
-|maxColumnsToMerge|Limit of the number of segments to merge in a single phase
when merging segments for publishing. This limit affects the total number of
columns present in a set of segments to merge. If the limit is exceeded,
segment merging occurs in multiple phases. Druid merges at least 2 segments per
phase, regardless of this setting.|-1 (unlimited)|no|
-|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows
in segments waiting to be pushed. Used to determine when intermediate pushing
should occur.|20000000|no|
-|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the
number of shards to create when using a `hashed` `partitionsSpec`. If this is
specified and `intervals` is specified in the `granularitySpec`, the index task
can skip the determine intervals/partitions pass through the data.|null|no|
-|splitHintSpec|Hint to control the amount of data that each first phase task
reads. Druid may ignore the hint depending on the implementation of the input
source. See [Split hint spec](#split-hint-spec) for more details.|size-based
split hint spec|no|
-|partitionsSpec|Defines how to partition data in each timeChunk, see
[PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false,
`hashed` or `single_dim` if `forceGuaranteedRollup` = true|no|
-|indexSpec|Defines segment storage format options to be used at indexing time,
see [IndexSpec](ingestion-spec.md#indexspec)|null|no|
-|indexSpecForIntermediatePersists|Defines segment storage format options to
use at indexing time for intermediate persisted temporary segments. You can use
this configuration to disable dimension/metric compression on intermediate
segments to reduce memory required for final merging. However, if you disable
compression on intermediate segments, page cache use my increase while
intermediate segments are used before Druid merges them to the final published
segment published. See [IndexSpec [...]
-|maxPendingPersists|Maximum number of pending persists that remain not
started. If a new intermediate persist exceeds this limit, ingestion blocks
until the currently-running persist finishes. Maximum heap memory usage for
indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one
persist can be running concurrently with ingestion, and none can be queued
up)|no|
-|forceGuaranteedRollup|Forces [perfect rollup](rollup.md). The perfect rollup
optimizes the total size of generated segments and querying time but increases
indexing time. If true, specify `intervals` in the `granularitySpec` and use
either `hashed` or `single_dim` for the `partitionsSpec`. You cannot use this
flag in conjunction with `appendToExisting` of IOConfig. For more details, see
[Segment pushing modes](#segment-pushing-modes).|false|no|
-|reportParseExceptions|If true, Druid throws exceptions encountered during
parsing and halts ingestion. If false, Druid skips unparseable rows and
fields.|false|no|
-|pushTimeout|Milliseconds to wait to push segments. Must be >= 0, where 0
means to wait forever.|0|no|
-|segmentWriteOutMediumFactory|Segment write-out medium to use when creating
segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|If
not specified, uses the value from
`druid.peon.defaultSegmentWriteOutMediumFactory.type` |no|
-|maxNumConcurrentSubTasks|Maximum number of worker tasks that can be run in
parallel at the same time. The supervisor task spawns worker tasks up to
`maxNumConcurrentSubTasks` regardless of the current available task slots. If
this value is 1, the supervisor task processes data ingestion on its own
instead of spawning worker tasks. If this value is set to too large, the
supervisor may create too many worker tasks that block other ingestion tasks.
See [Capacity planning](#capacity-plannin [...]
-|maxRetry|Maximum number of retries on task failures.|3|no|
-|maxNumSegmentsToMerge|Max limit for the number of segments that a single task
can merge at the same time in the second phase. Used only when
`forceGuaranteedRollup` is true.|100|no|
-|totalNumMergeTasks|Total number of tasks that merge segments in the merge
phase when `partitionsSpec` is set to `hashed` or `single_dim`.|10|no|
-|taskStatusCheckPeriodMs|Polling period in milliseconds to check running task
statuses.|1000|no|
-|chatHandlerTimeout|Timeout for reporting the pushed segments in worker
tasks.|PT10S|no|
-|chatHandlerNumRetries|Retries for reporting the pushed segments in worker
tasks.|5|no|
-|awaitSegmentAvailabilityTimeoutMillis|Long|Milliseconds to wait for the newly
indexed segments to become available for query after ingestion completes. If
`<= 0`, no wait occurs. If `> 0`, the task waits for the Coordinator to
indicate that the new segments are available for querying. If the timeout
expires, the task exits as successful, but the segments are not confirmed as
available for query.|no (default = 0)|
+|`type`|The task type. Set the value to`index_parallel`.|none|yes|
+|`maxRowsInMemory`|Determines when Druid should perform intermediate persists
to disk. Normally you don't need to set this. Depending on the nature of your
data, if rows are short in terms of bytes. For example, you may not want to
store a million rows in memory. In this case, set this value.|1000000|no|
+|`maxBytesInMemory`|Use to determine when Druid should perform intermediate
persists to disk. Normally Druid computes this internally and you don't need to
set it. This value represents number of bytes to aggregate in heap memory
before persisting. This is based on a rough estimate of memory usage and not
actual usage. The maximum heap memory usage for indexing is `maxBytesInMemory *
(2 + maxPendingPersists)`. Note that `maxBytesInMemory` also includes heap
usage of artifacts created fro [...]
+|`maxColumnsToMerge`|Limit of the number of segments to merge in a single
phase when merging segments for publishing. This limit affects the total number
of columns present in a set of segments to merge. If the limit is exceeded,
segment merging occurs in multiple phases. Druid merges at least 2 segments per
phase, regardless of this setting.|-1 (unlimited)|no|
+|`maxTotalRows`|Deprecated. Use `partitionsSpec` instead. Total number of rows
in segments waiting to be pushed. Used to determine when intermediate pushing
should occur.|20000000|no|
+|`numShards`|Deprecated. Use `partitionsSpec` instead. Directly specify the
number of shards to create when using a `hashed` `partitionsSpec`. If this is
specified and `intervals` is specified in the `granularitySpec`, the index task
can skip the determine intervals/partitions pass through the data.|null|no|
+|`splitHintSpec`|Hint to control the amount of data that each first phase task
reads. Druid may ignore the hint depending on the implementation of the input
source. See [Split hint spec](#split-hint-spec) for more details.|size-based
split hint spec|no|
+|`partitionsSpec`|Defines how to partition data in each timeChunk, see
[PartitionsSpec](#partitionsspec).|`dynamic` if `forceGuaranteedRollup` =
false, `hashed` or `single_dim` if `forceGuaranteedRollup` = true|no|
+|`indexSpec`|Defines segment storage format options to be used at indexing
time, see [IndexSpec](ingestion-spec.md#indexspec).|null|no|
+|`indexSpecForIntermediatePersists`|Defines segment storage format options to
use at indexing time for intermediate persisted temporary segments. You can use
this configuration to disable dimension/metric compression on intermediate
segments to reduce memory required for final merging. However, if you disable
compression on intermediate segments, page cache use my increase while
intermediate segments are used before Druid merges them to the final published
segment published. See [IndexSp [...]
+|`maxPendingPersists`|Maximum number of pending persists that remain not
started. If a new intermediate persist exceeds this limit, ingestion blocks
until the currently-running persist finishes. Maximum heap memory usage for
indexing scales with `maxRowsInMemory * (2 + maxPendingPersists)`.|0 (meaning
one persist can be running concurrently with ingestion, and none can be queued
up)|no|
+|`forceGuaranteedRollup`|Forces [perfect rollup](rollup.md). The perfect
rollup optimizes the total size of generated segments and querying time but
increases indexing time. If true, specify `intervals` in the `granularitySpec`
and use either `hashed` or `single_dim` for the `partitionsSpec`. You cannot
use this flag in conjunction with `appendToExisting` of `IOConfig`. For more
details, see [Segment pushing modes](#segment-pushing-modes).|false|no|
+|`reportParseExceptions`|If true, Druid throws exceptions encountered during
parsing and halts ingestion. If false, Druid skips unparseable rows and
fields.|false|no|
+|`pushTimeout`|Milliseconds to wait to push segments. Must be >= 0, where 0
means to wait forever.|0|no|
+|`segmentWriteOutMediumFactory`|Segment write-out medium to use when creating
segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|If
not specified, uses the value from
`druid.peon.defaultSegmentWriteOutMediumFactory.type` |no|
+|`maxNumConcurrentSubTasks`|Maximum number of worker tasks that can be run in
parallel at the same time. The supervisor task spawns worker tasks up to
`maxNumConcurrentSubTasks` regardless of the current available task slots. If
this value is 1, the supervisor task processes data ingestion on its own
instead of spawning worker tasks. If this value is set to too large, the
supervisor may create too many worker tasks that block other ingestion tasks.
See [Capacity planning](#capacity-plann [...]
+|`maxRetry`|Maximum number of retries on task failures.|3|no|
+|`maxNumSegmentsToMerge`|Max limit for the number of segments that a single
task can merge at the same time in the second phase. Used only when
`forceGuaranteedRollup` is true.|100|no|
+|`totalNumMergeTasks`|Total number of tasks that merge segments in the merge
phase when `partitionsSpec` is set to `hashed` or `single_dim`.|10|no|
+|`taskStatusCheckPeriodMs`|Polling period in milliseconds to check running
task statuses.|1000|no|
+|`chatHandlerTimeout`|Timeout for reporting the pushed segments in worker
tasks.|PT10S|no|
+|`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker
tasks.|5|no|
+|`awaitSegmentAvailabilityTimeoutMillis`|Milliseconds to wait for the newly
indexed segments to become available for query after ingestion completes. If
`<= 0`, no wait occurs. If `> 0`, the task waits for the Coordinator to
indicate that the new segments are available for querying. If the timeout
expires, the task exits as successful, but the segments are not confirmed as
available for query.|Long|no (default = 0)|
### Split Hint Spec
@@ -256,25 +284,28 @@ The split hint spec is used to help the supervisor task
divide input sources. Ea
The size-based split hint spec affects all splittable input sources except for
the HTTP input source and SQL input source.
-|property|description|default|required?|
+|Property|Description|Default|Required|
|--------|-----------|-------|---------|
-|type|Set the value to `maxSize`.|none|yes|
-|maxSplitSize|Maximum number of bytes of input files to process in a single
subtask. If a single file is larger than the limit, Druid processes the file
alone in a single subtask. Druid does not split files across tasks. One subtask
will not process more files than `maxNumFiles` even when their total size is
smaller than `maxSplitSize`. [Human-readable
format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
-|maxNumFiles|Maximum number of input files to process in a single subtask.
This limit avoids task failures when the ingestion spec is too long. There are
two known limits on the max size of serialized ingestion spec: the max ZNode
size in ZooKeeper (`jute.maxbuffer`) and the max packet size in MySQL
(`max_allowed_packet`). These limits can cause ingestion tasks fail if the
serialized ingestion spec size hits one of them. One subtask will not process
more data than `maxSplitSize` even whe [...]
+|`type`|Set the value to `maxSize`.|none|yes|
+|`maxSplitSize`|Maximum number of bytes of input files to process in a single
subtask. If a single file is larger than the limit, Druid processes the file
alone in a single subtask. Druid does not split files across tasks. One subtask
will not process more files than `maxNumFiles` even when their total size is
smaller than `maxSplitSize`. [Human-readable
format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
+|`maxNumFiles`|Maximum number of input files to process in a single subtask.
This limit avoids task failures when the ingestion spec is too long. There are
two known limits on the max size of serialized ingestion spec: the max ZNode
size in ZooKeeper (`jute.maxbuffer`) and the max packet size in MySQL
(`max_allowed_packet`). These limits can cause ingestion tasks fail if the
serialized ingestion spec size hits one of them. One subtask will not process
more data than `maxSplitSize` even w [...]
#### Segments Split Hint Spec
The segments split hint spec is used only for
[`DruidInputSource`](./input-sources.md).
-|property|description|default|required?|
+|Property|Description|Default|Required|
|--------|-----------|-------|---------|
-|type|Set the value to `segments`.|none|yes|
-|maxInputSegmentBytesPerTask|Maximum number of bytes of input segments to
process in a single subtask. If a single segment is larger than this number,
Druid processes the segment alone in a single subtask. Druid never splits input
segments across tasks. A single subtask will not process more segments than
`maxNumSegments` even when their total size is smaller than
`maxInputSegmentBytesPerTask`. [Human-readable
format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
-|maxNumSegments|Maximum number of input segments to process in a single
subtask. This limit avoids failures due to the the ingestion spec being too
long. There are two known limits on the max size of serialized ingestion spec:
the max ZNode size in ZooKeeper (`jute.maxbuffer`) and the max packet size in
MySQL (`max_allowed_packet`). These limits can make ingestion tasks fail when
the serialized ingestion spec size hits one of them. A single subtask will not
process more data than `maxInp [...]
+|`type`|Set the value to `segments`.|none|yes|
+|`maxInputSegmentBytesPerTask`|Maximum number of bytes of input segments to
process in a single subtask. If a single segment is larger than this number,
Druid processes the segment alone in a single subtask. Druid never splits input
segments across tasks. A single subtask will not process more segments than
`maxNumSegments` even when their total size is smaller than
`maxInputSegmentBytesPerTask`. [Human-readable
format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
+|`maxNumSegments`|Maximum number of input segments to process in a single
subtask. This limit avoids failures due to the the ingestion spec being too
long. There are two known limits on the max size of serialized ingestion spec:
the max ZNode size in ZooKeeper (`jute.maxbuffer`) and the max packet size in
MySQL (`max_allowed_packet`). These limits can make ingestion tasks fail when
the serialized ingestion spec size hits one of them. A single subtask will not
process more data than `maxI [...]
### `partitionsSpec`
-The primary partition for Druid is time. You can define a secondary
partitioning method in the partitions spec. Use the `partitionsSpec` type that
applies for your [rollup](rollup.md) method. For perfect rollup, you can use:
+The primary partition for Druid is time. You can define a secondary
partitioning method in the partitions spec. Use the `partitionsSpec` type that
applies for your [rollup](rollup.md) method.
+
+For perfect rollup, you can use:
+
- `hashed` partitioning based on the hash value of specified dimensions for
each row
- `single_dim` based on ranges of values for a single dimension
- `range` based on ranges of values of multiple dimensions.
@@ -294,14 +325,14 @@ The `partitionsSpec` types have different characteristics.
#### Dynamic partitioning
-|property|description|default|required?|
+|Property|Description|Default|Required|
|--------|-----------|-------|---------|
-|type|Set the value to `dynamic`.|none|yes|
-|maxRowsPerSegment|Used in sharding. Determines how many rows are in each
segment.|5000000|no|
-|maxTotalRows|Total number of rows across all segments waiting for being
pushed. Used in determining when intermediate segment push should
occur.|20000000|no|
+|`type`|Set the value to `dynamic`.|none|yes|
+|`maxRowsPerSegment`|Used in sharding. Determines how many rows are in each
segment.|5000000|no|
+|`maxTotalRows`|Total number of rows across all segments waiting for being
pushed. Used in determining when intermediate segment push should
occur.|20000000|no|
+
+With the dynamic partitioning, the parallel index task runs in a single phase
spawning multiple worker tasks (type `single_phase_sub_task`), each of which
creates segments.
-With the Dynamic partitioning, the parallel index task runs in a single phase:
-it spawns multiple worker tasks (type `single_phase_sub_task`), each of which
creates segments.
How the worker task creates segments:
- Whenever the number of rows in the current segment exceeds
@@ -310,29 +341,32 @@ How the worker task creates segments:
#### Hash-based partitioning
-|property|description|default|required?|
+|Property|Description|Default|Required|
|--------|-----------|-------|---------|
-|type|Set the value to `hashed`.|none|yes|
-|numShards|Directly specify the number of shards to create. If this is
specified and `intervals` is specified in the `granularitySpec`, the index task
can skip the determine intervals/partitions pass through the data. This
property and `targetRowsPerSegment` cannot both be set.|none|no|
-|targetRowsPerSegment|A target row count for each partition. If `numShards` is
left unspecified, the Parallel task will determine a partition count
automatically such that each partition has a row count close to the target,
assuming evenly distributed keys in the input data. A target per-segment row
count of 5 million is used if both `numShards` and `targetRowsPerSegment` are
null. |null (or 5,000,000 if both `numShards` and `targetRowsPerSegment` are
null)|no|
-|partitionDimensions|The dimensions to partition on. Leave blank to select all
dimensions.|null|no|
-|partitionFunction|A function to compute hash of partition dimensions. See
[Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no|
+|`type`|Set the value to `hashed`.|none|yes|
+|`numShards`|Directly specify the number of shards to create. If this is
specified and `intervals` is specified in the `granularitySpec`, the index task
can skip the determine intervals/partitions pass through the data. This
property and `targetRowsPerSegment` cannot both be set.|none|no|
+|`targetRowsPerSegment`|A target row count for each partition. If `numShards`
is left unspecified, the Parallel task will determine a partition count
automatically such that each partition has a row count close to the target,
assuming evenly distributed keys in the input data. A target per-segment row
count of 5 million is used if both `numShards` and `targetRowsPerSegment` are
null. |null (or 5,000,000 if both `numShards` and `targetRowsPerSegment` are
null)|no|
+|`partitionDimensions`|The dimensions to partition on. Leave blank to select
all dimensions.|null|no|
+|`partitionFunction`|A function to compute hash of partition dimensions. See
[Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no|
The Parallel task with hash-based partitioning is similar to
[MapReduce](https://en.wikipedia.org/wiki/MapReduce).
-The task runs in up to 3 phases: `partial dimension cardinality`, `partial
segment generation` and `partial segment merge`.
-- The `partial dimension cardinality` phase is an optional phase that only
runs if `numShards` is not specified.
+The task runs in up to three phases: `partial dimension cardinality`, `partial
segment generation` and `partial segment merge`.
+
+The `partial dimension cardinality` phase is an optional phase that only runs
if `numShards` is not specified.
The Parallel task splits the input data and assigns them to worker tasks based
on the split hint spec.
Each worker task (type `partial_dimension_cardinality`) gathers estimates of
partitioning dimensions cardinality for
each time chunk. The Parallel task will aggregate these estimates from the
worker tasks and determine the highest
cardinality across all of the time chunks in the input data, dividing this
cardinality by `targetRowsPerSegment` to
automatically determine `numShards`.
-- In the `partial segment generation` phase, just like the Map phase in
MapReduce,
+
+In the `partial segment generation` phase, just like the Map phase in
MapReduce,
the Parallel task splits the input data based on the split hint spec
and assigns each split to a worker task. Each worker task (type
`partial_index_generate`) reads the assigned split, and partitions rows by the
time chunk from `segmentGranularity` (primary partition key) in the
`granularitySpec`
and then by the hash value of `partitionDimensions` (secondary partition key)
in the `partitionsSpec`.
The partitioned data is stored in local storage of
the [middleManager](../design/middlemanager.md) or the
[indexer](../design/indexer.md).
-- The `partial segment merge` phase is similar to the Reduce phase in
MapReduce.
+
+The `partial segment merge` phase is similar to the Reduce phase in MapReduce.
The Parallel task spawns a new set of worker tasks (type
`partial_index_generic_merge`) to merge the partitioned data created in the
previous phase. Here, the partitioned data is shuffled based on
the time chunk and the hash value of `partitionDimensions` to be merged; each
worker task reads the data falling in the same time chunk and the same hash
value from multiple MiddleManager/Indexer processes and merges them to create
the final segments. Finally, they push the final segments to the deep storage
at once.
@@ -360,33 +394,36 @@ When you use this technique to partition your data,
segment sizes may be unequal
Range partitioning is not possible on multi-value dimensions. If the provided
`partitionDimension` is multi-value, your ingestion job will report an error.
-|property|description|default|required?|
+|Property|Description|Default|Required|
|--------|-----------|-------|---------|
-|type|Set the value to `single_dim`.|none|yes|
-|partitionDimension|The dimension to partition on. Only rows with a single
dimension value are allowed.|none|yes|
-|targetRowsPerSegment|Target number of rows to include in a partition, should
be a number that targets segments of 500MB\~1GB.|none|either this or
`maxRowsPerSegment`|
-|maxRowsPerSegment|Soft max for the number of rows to include in a
partition.|none|either this or `targetRowsPerSegment`|
-|assumeGrouped|Assume that input data has already been grouped on time and
dimensions. Ingestion will run faster, but may choose sub-optimal partitions if
this assumption is violated.|false|no|
+|`type`|Set the value to `single_dim`.|none|yes|
+|`partitionDimension`|The dimension to partition on. Only rows with a single
dimension value are allowed.|none|yes|
+|`targetRowsPerSegment`|Target number of rows to include in a partition,
should be a number that targets segments of 500MB\~1GB.|none|either this or
`maxRowsPerSegment`|
+|`maxRowsPerSegment`|Soft max for the number of rows to include in a
partition.|none|either this or `targetRowsPerSegment`|
+|`assumeGrouped`|Assume that input data has already been grouped on time and
dimensions. Ingestion will run faster, but may choose sub-optimal partitions if
this assumption is violated.|false|no|
With `single-dim` partitioning, the Parallel task runs in 3 phases,
i.e., `partial dimension distribution`, `partial segment generation`, and
`partial segment merge`.
The first phase is to collect some statistics to find
the best partitioning and the other 2 phases are to create partial segments
and to merge them, respectively, as in hash-based partitioning.
-- In the `partial dimension distribution` phase, the Parallel task splits the
input data and
+
+In the `partial dimension distribution` phase, the Parallel task splits the
input data and
assigns them to worker tasks based on the split hint spec. Each worker task
(type `partial_dimension_distribution`) reads
the assigned split and builds a histogram for `partitionDimension`.
The Parallel task collects those histograms from worker tasks and finds
the best range partitioning based on `partitionDimension` to evenly
distribute rows across partitions. Note that either `targetRowsPerSegment`
or `maxRowsPerSegment` will be used to find the best partitioning.
-- In the `partial segment generation` phase, the Parallel task spawns new
worker tasks (type `partial_range_index_generate`)
+
+In the `partial segment generation` phase, the Parallel task spawns new worker
tasks (type `partial_range_index_generate`)
to create partitioned data. Each worker task reads a split created as in the
previous phase,
partitions rows by the time chunk from the `segmentGranularity` (primary
partition key) in the `granularitySpec`
and then by the range partitioning found in the previous phase.
The partitioned data is stored in local storage of
the [middleManager](../design/middlemanager.md) or the
[indexer](../design/indexer.md).
-- In the `partial segment merge` phase, the parallel index task spawns a new
set of worker tasks (type `partial_index_generic_merge`) to merge the
partitioned
+
+In the `partial segment merge` phase, the parallel index task spawns a new set
of worker tasks (type `partial_index_generic_merge`) to merge the partitioned
data created in the previous phase. Here, the partitioned data is shuffled
based on
the time chunk and the value of `partitionDimension`; each worker task reads
the segments
falling in the same partition of the same range from multiple
MiddleManager/Indexer processes and merges
@@ -411,13 +448,13 @@ Druid to distribute segment sizes more evenly, and to
prune on more dimensions.
Range partitioning is not possible on multi-value dimensions. If one of the
provided
`partitionDimensions` is multi-value, your ingestion job will report an error.
-|property|description|default|required?|
+|Property|Description|Default|Required|
|--------|-----------|-------|---------|
-|type|Set the value to `range`.|none|yes|
-|partitionDimensions|An array of dimensions to partition on. Order the
dimensions from most frequently queried to least frequently queried. For best
results, limit your number of dimensions to between three and five
dimensions.|none|yes|
-|targetRowsPerSegment|Target number of rows to include in a partition, should
be a number that targets segments of 500MB\~1GB.|none|either this or
`maxRowsPerSegment`|
+|`type`|Set the value to `range`.|none|yes|
+|`partitionDimensions`|An array of dimensions to partition on. Order the
dimensions from most frequently queried to least frequently queried. For best
results, limit your number of dimensions to between three and five
dimensions.|none|yes|
+|`targetRowsPerSegment`|Target number of rows to include in a partition,
should be a number that targets segments of 500MB\~1GB.|none|either this or
`maxRowsPerSegment`|
|maxRowsPerSegment|Soft max for the number of rows to include in a
partition.|none|either this or `targetRowsPerSegment`|
-|assumeGrouped|Assume that input data has already been grouped on time and
dimensions. Ingestion will run faster, but may choose sub-optimal partitions if
this assumption is violated.|false|no|
+|`assumeGrouped`|Assume that input data has already been grouped on time and
dimensions. Ingestion will run faster, but may choose sub-optimal partitions if
this assumption is violated.|false|no|
#### Benefits of range partitioning
@@ -454,18 +491,15 @@ support pruning.
## HTTP status endpoints
-The supervisor task provides some HTTP endpoints to get running status.
+The Supervisor task provides some HTTP endpoints to get running status.
-* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/mode`
-
-Returns 'parallel' if the indexing task is running in parallel. Otherwise, it
returns 'sequential'.
-
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/phase`
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/mode`
+Returns `parallel` if the indexing task is running in parallel. Otherwise, it
returns `sequential`.
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/phase`
Returns the name of the current phase if the task running in the parallel mode.
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress`
-
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress`
Returns the estimated progress of the current phase if the supervisor task is
running in the parallel mode.
An example of the result is
@@ -481,32 +515,27 @@ An example of the result is
}
```
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtasks/running`
-
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtasks/running`
Returns the task IDs of running worker tasks, or an empty list if the
supervisor task is running in the sequential mode.
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs`
-
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs`
Returns all worker task specs, or an empty list if the supervisor task is
running in the sequential mode.
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/running`
-
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/running`
Returns running worker task specs, or an empty list if the supervisor task is
running in the sequential mode.
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/complete`
-
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/complete`
Returns complete worker task specs, or an empty list if the supervisor task is
running in the sequential mode.
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}`
-
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}`
Returns the worker task spec of the given id, or HTTP 404 Not Found error if
the supervisor task is running in the sequential mode.
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/state`
-
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/state`
Returns the state of the worker task spec of the given id, or HTTP 404 Not
Found error if the supervisor task is running in the sequential mode.
The returned result contains the worker task spec, a current task status if
exists, and task attempt history.
-An example of the result is
+<details>
+<summary>Click to view the response</summary>
```json
{
@@ -677,9 +706,9 @@ An example of the result is
"taskHistory": []
}
```
+</details>
-*
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/history`
-
+`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/history`
Returns the task attempt history of the worker task spec of the given id, or
HTTP 404 Not Found error if the supervisor task is running in the sequential
mode.
## Segment pushing modes
@@ -691,8 +720,8 @@ the parallel task index supports the following segment
pushing modes based upon
## Capacity planning
-The supervisor task can create up to `maxNumConcurrentSubTasks` worker tasks
no matter how many task slots are currently available.
-As a result, total number of tasks which can be run at the same time is
`(maxNumConcurrentSubTasks + 1)` (including the supervisor task).
+The Supervisor task can create up to `maxNumConcurrentSubTasks` worker tasks
no matter how many task slots are currently available.
+As a result, total number of tasks which can be run at the same time is
`(maxNumConcurrentSubTasks + 1)` (including the Supervisor task).
Please note that this can be even larger than total number of task slots (sum
of the capacity of all workers).
If `maxNumConcurrentSubTasks` is larger than `n (available task slots)`, then
`maxNumConcurrentSubTasks` tasks are created by the supervisor task, but only
`n` tasks would be started.
@@ -728,6 +757,6 @@ For information on how to combine input sources, see
[Combining input source](./
### `segmentWriteOutMediumFactory`
-|Field|Type|Description|Required|
+|Property|Type|Description|Required|
|-----|----|-----------|--------|
-|type|String|See [Additional Peon Configuration:
SegmentWriteOutMediumFactory](../configuration/index.md#segmentwriteoutmediumfactory)
for explanation and available options.|yes|
+|`type`|String|See [Additional Peon Configuration:
SegmentWriteOutMediumFactory](../configuration/index.md#segmentwriteoutmediumfactory)
for explanation and available options.|yes|
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]