This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c7229fc787 Limit max batch size for segment allocation, add docs
(#13503)
c7229fc787 is described below
commit c7229fc7871d6068bb488ce398ba8c1632f7151f
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Dec 7 10:07:14 2022 +0530
Limit max batch size for segment allocation, add docs (#13503)
Changes:
- Limit max batch size in `SegmentAllocationQueue` to 500
- Rename `batchAllocationMaxWaitTime` to `batchAllocationWaitTime` since
the actual
wait time may exceed this configured value.
- Replace usage of `SegmentInsertAction` in `TaskToolbox` with
`SegmentTransactionalInsertAction`
---
docs/configuration/index.md | 2 +
docs/ingestion/tasks.md | 20 ++++++++++
docs/operations/metrics.md | 10 ++++-
.../apache/druid/indexing/common/TaskToolbox.java | 8 +++-
.../common/actions/SegmentAllocationQueue.java | 44 ++++++++++++++++++----
.../indexing/overlord/config/TaskLockConfig.java | 6 +--
.../common/actions/SegmentAllocationQueueTest.java | 19 +++++++++-
.../indexing/common/actions/TaskActionTestKit.java | 2 +-
8 files changed, 94 insertions(+), 17 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 9582bbd99b..a14b3beb56 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1112,6 +1112,8 @@ These Overlord static configurations can be defined in
the `overlord/runtime.pro
|`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates
whether incoming tasks should be stored locally (in heap) or in metadata
storage. "local" is mainly for internal testing while "metadata" is recommended
in production because storing incoming tasks in metadata storage allows for
tasks to be resumed if the Overlord should fail.|local|
|`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store
task results. Default is 24 hours. If you have hundreds of tasks running in a
day, consider increasing this threshold.|PT24H|
|`druid.indexer.tasklock.forceTimeChunkLock`|_**Setting this to false is still
experimental**_<br/> If set, all tasks are enforced to use time chunk lock. If
not set, each task automatically chooses a lock type to use. This configuration
can be overwritten by setting `forceTimeChunkLock` in the [task
context](../ingestion/tasks.md#context). See [Task Locking &
Priority](../ingestion/tasks.md#context) for more details about locking in
tasks.|true|
+|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, Druid
performs segment allocate actions in batches to improve throughput and reduce
the average `task/action/run/time`. See [batching `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions) for
details.|false|
+|`druid.indexer.tasklock.batchAllocationWaitTime`|Number of milliseconds after
Druid adds the first segment allocate action to a batch, until it executes the
batch. Allows the batch to add more requests and improve the average segment
allocation run time. This configuration takes effect only if
`batchSegmentAllocation` is enabled.|500|
|`druid.indexer.task.default.context`|Default task context that is applied to
all tasks submitted to the Overlord. Any default in this config does not
override neither the context values the user provides nor
`druid.indexer.tasklock.forceTimeChunkLock`.|empty context|
|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one
time.|Integer.MAX_VALUE|
|`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord
queue management. This can be useful to give a cluster time to re-orient itself
after e.g. a widespread network issue.|PT1M|
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index c8a2e915d4..5afbadb3d4 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -343,6 +343,26 @@ You can override the task priority by setting your
priority in the task context
"priority" : 100
}
```
+<a name="actions"></a>
+
+## Task actions
+
+Task actions are overlord actions performed by tasks during their lifecycle.
Some typical task actions are:
+- `lockAcquire`: acquires a time-chunk lock on an interval for the task
+- `lockRelease`: releases a lock acquired by the task on an interval
+- `segmentTransactionalInsert`: publishes new segments created by a task and
optionally overwrites and/or drops existing segments in a single transaction
+- `segmentAllocate`: allocates pending segments to a task to write rows
+
+### Batching `segmentAllocate` actions
+
+In a cluster with several concurrent tasks, `segmentAllocate` actions on the
overlord can take a long time to finish, causing spikes in the
`task/action/run/time`. This can result in ingestion lag building up while a
task waits for a segment to be allocated.
+The root cause of such spikes is likely to be one or more of the following:
+- several concurrent tasks trying to allocate segments for the same datasource
and interval
+- large number of metadata calls made to the segments and pending segments
tables
+- concurrency limitations while acquiring a task lock required for allocating
a segment
+
+Since the contention typically arises from tasks allocating segments for the
same datasource and interval, you can improve the run times by batching the
actions together.
+To enable batched segment allocation on the overlord, set
`druid.indexer.tasklock.batchSegmentAllocation` to `true`. See [overlord
configuration](../configuration/index.md#overlord-operations) for more details.
<a name="context"></a>
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8822f3fea7..4e3c961dcb 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -230,8 +230,14 @@ Note: If the JVM does not support CPU time measurement for
the current thread, `
|------|-----------|------------------------------------------------------------|------------|
|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`,
`taskType`, `taskStatus`|Varies|
|`task/pending/time`|Milliseconds taken for a task to wait for running.|
`dataSource`, `taskId`, `taskType`|Varies|
-|`task/action/log/time`|Milliseconds taken to log a task action to the audit
log.| `dataSource`, `taskId`, `taskType`|< 1000 (subsecond)|
-|`task/action/run/time`|Milliseconds taken to execute a task action.|
`dataSource`, `taskId`, `taskType`|Varies from subsecond to a few seconds,
based on action type.|
+|`task/action/log/time`|Milliseconds taken to log a task action to the audit
log.| `dataSource`, `taskId`, `taskType`, `taskActionType`|< 1000 (subsecond)|
+|`task/action/run/time`|Milliseconds taken to execute a task action.|
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies from subsecond to a
few seconds, based on action type.|
+|`task/action/success/count`|Number of task actions that were executed
successfully during the emission period. Currently only being emitted for
[batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
+|`task/action/failed/count`|Number of task actions that failed during the
emission period. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
+|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions
in queue. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskActionType`, `interval`|Varies based on the
`batchAllocationWaitTime` and number of batches in queue.|
+|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task
actions. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few
seconds, based on action type and batch size.|
+|`task/action/batch/size`|Number of task actions in a batch that was executed
during the emission period. Currently only being emitted for [batched
`segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent
task actions.|
+|`task/action/batch/attempts`|Number of execution attempts for a single batch
of task actions. Currently only being emitted for [batched `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions).|
`dataSource`, `taskActionType`, `interval`|1 if there are no failures or
retries.|
|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`,
`taskId`, `taskType`, `interval`|Varies|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move
Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|
`dataSource`, `taskId`, `taskType`, `interval`|Varies|
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 897db97503..03a4849e60 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -32,7 +32,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.indexing.common.actions.SegmentInsertAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
@@ -331,7 +331,11 @@ public class TaskToolbox
DataSegment::getInterval
);
for (final Collection<DataSegment> segmentCollection :
segmentMultimap.asMap().values()) {
- getTaskActionClient().submit(new
SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
+ getTaskActionClient().submit(
+ SegmentTransactionalInsertAction.appendAction(
+ ImmutableSet.copyOf(segmentCollection), null, null
+ )
+ );
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index 9ed53d99fa..d60149d624 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -70,6 +70,7 @@ public class SegmentAllocationQueue
private static final Logger log = new Logger(SegmentAllocationQueue.class);
private static final int MAX_QUEUE_SIZE = 2000;
+ private static final int MAX_BATCH_SIZE = 500;
private final long maxWaitTimeMillis;
@@ -94,7 +95,7 @@ public class SegmentAllocationQueue
this.emitter = emitter;
this.taskLockbox = taskLockbox;
this.metadataStorage = metadataStorage;
- this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+ this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime();
this.executor = taskLockConfig.isBatchSegmentAllocation()
? executorFactory.create(1, "SegmentAllocQueue-%s") : null;
@@ -173,7 +174,7 @@ public class SegmentAllocationQueue
throw new ISE("Batched segment allocation is disabled.");
}
- final AllocateRequestKey requestKey = new AllocateRequestKey(request,
maxWaitTimeMillis);
+ final AllocateRequestKey requestKey = getKeyForAvailableBatch(request);
final AtomicReference<Future<SegmentIdWithShardSpec>> futureReference =
new AtomicReference<>();
// Possible race condition:
@@ -198,6 +199,24 @@ public class SegmentAllocationQueue
return futureReference.get();
}
+ /**
+ * Returns the key for a batch that is not added to the queue yet and/or has
+ * available space. Throws an exception if the queue is already full and no
+ * batch has available capacity.
+ */
+ private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest
request)
+ {
+ for (int batchIncrementalId = 0; batchIncrementalId < MAX_QUEUE_SIZE;
++batchIncrementalId) {
+ AllocateRequestKey nextKey = new AllocateRequestKey(request,
maxWaitTimeMillis, batchIncrementalId);
+ AllocateRequestBatch nextBatch = keyToBatch.get(nextKey);
+ if (nextBatch == null || nextBatch.size() < MAX_BATCH_SIZE) {
+ return nextKey;
+ }
+ }
+
+ throw new ISE("Allocation queue is at capacity, all batches are full.");
+ }
+
/**
* Tries to add the given batch to the processing queue. Fails all the
pending
* requests in the batch if we are not leader or if the queue is full.
@@ -616,6 +635,11 @@ public class SegmentAllocationQueue
*/
private static class AllocateRequestKey
{
+ /**
+ * ID to distinguish between two batches for the same datasource, groupId,
etc.
+ */
+ private final int batchIncrementalId;
+
private long queueTimeMillis;
private final long maxWaitTimeMillis;
@@ -635,11 +659,12 @@ public class SegmentAllocationQueue
* Creates a new key for the given request. The batch for a unique key will
* always contain a single request.
*/
- AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis)
+ AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis,
int batchIncrementalId)
{
final SegmentAllocateAction action = request.getAction();
final Task task = request.getTask();
+ this.batchIncrementalId = batchIncrementalId;
this.dataSource = action.getDataSource();
this.groupId = task.getGroupId();
this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck();
@@ -651,10 +676,11 @@ public class SegmentAllocationQueue
.bucket(action.getTimestamp());
this.hash = Objects.hash(
- skipSegmentLineageCheck,
- useNonRootGenPartitionSpace,
dataSource,
groupId,
+ batchIncrementalId,
+ skipSegmentLineageCheck,
+ useNonRootGenPartitionSpace,
preferredAllocationInterval,
lockGranularity
);
@@ -687,10 +713,11 @@ public class SegmentAllocationQueue
return false;
}
AllocateRequestKey that = (AllocateRequestKey) o;
- return skipSegmentLineageCheck == that.skipSegmentLineageCheck
- && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
- && dataSource.equals(that.dataSource)
+ return dataSource.equals(that.dataSource)
&& groupId.equals(that.groupId)
+ && batchIncrementalId == that.batchIncrementalId
+ && skipSegmentLineageCheck == that.skipSegmentLineageCheck
+ && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
&&
preferredAllocationInterval.equals(that.preferredAllocationInterval)
&& lockGranularity == that.lockGranularity;
}
@@ -707,6 +734,7 @@ public class SegmentAllocationQueue
return "{" +
"ds='" + dataSource + '\'' +
", gr='" + groupId + '\'' +
+ ", incId=" + batchIncrementalId +
", lock=" + lockGranularity +
", invl=" + preferredAllocationInterval +
", slc=" + skipSegmentLineageCheck +
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
index acbc318baa..c860e4e1d1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
@@ -34,7 +34,7 @@ public class TaskLockConfig
private boolean batchSegmentAllocation = false;
@JsonProperty
- private long batchAllocationMaxWaitTime = 500L;
+ private long batchAllocationWaitTime = 500L;
public boolean isForceTimeChunkLock()
{
@@ -46,8 +46,8 @@ public class TaskLockConfig
return batchSegmentAllocation;
}
- public long getBatchAllocationMaxWaitTime()
+ public long getBatchAllocationWaitTime()
{
- return batchAllocationMaxWaitTime;
+ return batchAllocationWaitTime;
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
index 536e9ffac2..974b3096f9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
@@ -71,7 +71,7 @@ public class SegmentAllocationQueueTest
}
@Override
- public long getBatchAllocationMaxWaitTime()
+ public long getBatchAllocationWaitTime()
{
return 0;
}
@@ -249,6 +249,23 @@ public class SegmentAllocationQueueTest
);
}
+ @Test
+ public void testMaxBatchSize()
+ {
+ for (int i = 0; i < 500; ++i) {
+ SegmentAllocateRequest request =
+ allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build();
+ allocationQueue.add(request);
+ }
+
+ // Verify that next request is added to a new batch
+ Assert.assertEquals(1, allocationQueue.size());
+ SegmentAllocateRequest request =
+ allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build();
+ allocationQueue.add(request);
+ Assert.assertEquals(2, allocationQueue.size());
+ }
+
@Test
public void testMultipleRequestsForSameSegment()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index 189d3fe877..eebf78a7dd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -112,7 +112,7 @@ public class TaskActionTestKit extends ExternalResource
}
@Override
- public long getBatchAllocationMaxWaitTime()
+ public long getBatchAllocationWaitTime()
{
return 10L;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]