This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c7229fc787 Limit max batch size for segment allocation, add docs 
(#13503)
c7229fc787 is described below

commit c7229fc7871d6068bb488ce398ba8c1632f7151f
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Dec 7 10:07:14 2022 +0530

    Limit max batch size for segment allocation, add docs (#13503)
    
    Changes:
    - Limit max batch size in `SegmentAllocationQueue` to 500
    - Rename `batchAllocationMaxWaitTime` to `batchAllocationWaitTime` since 
the actual
    wait time may exceed this configured value.
    - Replace usage of `SegmentInsertAction` in `TaskToolbox` with 
`SegmentTransactionalInsertAction`
---
 docs/configuration/index.md                        |  2 +
 docs/ingestion/tasks.md                            | 20 ++++++++++
 docs/operations/metrics.md                         | 10 ++++-
 .../apache/druid/indexing/common/TaskToolbox.java  |  8 +++-
 .../common/actions/SegmentAllocationQueue.java     | 44 ++++++++++++++++++----
 .../indexing/overlord/config/TaskLockConfig.java   |  6 +--
 .../common/actions/SegmentAllocationQueueTest.java | 19 +++++++++-
 .../indexing/common/actions/TaskActionTestKit.java |  2 +-
 8 files changed, 94 insertions(+), 17 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 9582bbd99b..a14b3beb56 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1112,6 +1112,8 @@ These Overlord static configurations can be defined in 
the `overlord/runtime.pro
 |`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates 
whether incoming tasks should be stored locally (in heap) or in metadata 
storage. "local" is mainly for internal testing while "metadata" is recommended 
in production because storing incoming tasks in metadata storage allows for 
tasks to be resumed if the Overlord should fail.|local|
 |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store 
task results. Default is 24 hours. If you have hundreds of tasks running in a 
day, consider increasing this threshold.|PT24H|
 |`druid.indexer.tasklock.forceTimeChunkLock`|_**Setting this to false is still 
experimental**_<br/> If set, all tasks are enforced to use time chunk lock. If 
not set, each task automatically chooses a lock type to use. This configuration 
can be overwritten by setting `forceTimeChunkLock` in the [task 
context](../ingestion/tasks.md#context). See [Task Locking & 
Priority](../ingestion/tasks.md#context) for more details about locking in 
tasks.|true|
+|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, Druid 
performs segment allocate actions in batches to improve throughput and reduce 
the average `task/action/run/time`. See [batching `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions) for 
details.|false|
+|`druid.indexer.tasklock.batchAllocationWaitTime`|Number of milliseconds after 
Druid adds the first segment allocate action to a batch, until it executes the 
batch. Allows the batch to add more requests and improve the average segment 
allocation run time. This configuration takes effect only if 
`batchSegmentAllocation` is enabled.|500|
 |`druid.indexer.task.default.context`|Default task context that is applied to 
all tasks submitted to the Overlord. Any default in this config does not 
override neither the context values the user provides nor 
`druid.indexer.tasklock.forceTimeChunkLock`.|empty context|
 |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one 
time.|Integer.MAX_VALUE|
 |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord 
queue management. This can be useful to give a cluster time to re-orient itself 
after e.g. a widespread network issue.|PT1M|
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index c8a2e915d4..5afbadb3d4 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -343,6 +343,26 @@ You can override the task priority by setting your 
priority in the task context
   "priority" : 100
 }
 ```
+<a name="actions"></a>
+
+## Task actions
+
+Task actions are overlord actions performed by tasks during their lifecycle. 
Some typical task actions are:
+- `lockAcquire`: acquires a time-chunk lock on an interval for the task
+- `lockRelease`: releases a lock acquired by the task on an interval
+- `segmentTransactionalInsert`: publishes new segments created by a task and 
optionally overwrites and/or drops existing segments in a single transaction
+- `segmentAllocate`: allocates pending segments to a task to write rows
+
+### Batching `segmentAllocate` actions
+
+In a cluster with several concurrent tasks, `segmentAllocate` actions on the 
overlord can take a long time to finish, causing spikes in the 
`task/action/run/time`. This can result in ingestion lag building up while a 
task waits for a segment to be allocated.
+The root cause of such spikes is likely to be one or more of the following:
+- several concurrent tasks trying to allocate segments for the same datasource 
and interval
+- large number of metadata calls made to the segments and pending segments 
tables 
+- concurrency limitations while acquiring a task lock required for allocating 
a segment
+
+Since the contention typically arises from tasks allocating segments for the 
same datasource and interval, you can improve the run times by batching the 
actions together.
+To enable batched segment allocation on the overlord, set  
`druid.indexer.tasklock.batchSegmentAllocation` to `true`. See [overlord 
configuration](../configuration/index.md#overlord-operations) for more details.
 
 <a name="context"></a>
 
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8822f3fea7..4e3c961dcb 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -230,8 +230,14 @@ Note: If the JVM does not support CPU time measurement for 
the current thread, `
 
|------|-----------|------------------------------------------------------------|------------|
 |`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, 
`taskType`, `taskStatus`|Varies|
 |`task/pending/time`|Milliseconds taken for a task to wait for running.| 
`dataSource`, `taskId`, `taskType`|Varies|
-|`task/action/log/time`|Milliseconds taken to log a task action to the audit 
log.| `dataSource`, `taskId`, `taskType`|< 1000 (subsecond)|
-|`task/action/run/time`|Milliseconds taken to execute a task action.| 
`dataSource`, `taskId`, `taskType`|Varies from subsecond to a few seconds, 
based on action type.|
+|`task/action/log/time`|Milliseconds taken to log a task action to the audit 
log.| `dataSource`, `taskId`, `taskType`, `taskActionType`|< 1000 (subsecond)|
+|`task/action/run/time`|Milliseconds taken to execute a task action.| 
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies from subsecond to a 
few seconds, based on action type.|
+|`task/action/success/count`|Number of task actions that were executed 
successfully during the emission period. Currently only being emitted for 
[batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
+|`task/action/failed/count`|Number of task actions that failed during the 
emission period. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
+|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions 
in queue. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskActionType`, `interval`|Varies based on the 
`batchAllocationWaitTime` and number of batches in queue.|
+|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task 
actions. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few 
seconds, based on action type and batch size.|
+|`task/action/batch/size`|Number of task actions in a batch that was executed 
during the emission period. Currently only being emitted for [batched 
`segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent 
task actions.|
+|`task/action/batch/attempts`|Number of execution attempts for a single batch 
of task actions. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskActionType`, `interval`|1 if there are no failures or 
retries.|
 |`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, 
`taskId`, `taskType`, `interval`|Varies|
 |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move 
Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
 |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| 
`dataSource`, `taskId`, `taskType`, `interval`|Varies|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 897db97503..03a4849e60 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -32,7 +32,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.DruidNodeAnnouncer;
 import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.indexing.common.actions.SegmentInsertAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
@@ -331,7 +331,11 @@ public class TaskToolbox
         DataSegment::getInterval
     );
     for (final Collection<DataSegment> segmentCollection : 
segmentMultimap.asMap().values()) {
-      getTaskActionClient().submit(new 
SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
+      getTaskActionClient().submit(
+          SegmentTransactionalInsertAction.appendAction(
+              ImmutableSet.copyOf(segmentCollection), null, null
+          )
+      );
     }
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index 9ed53d99fa..d60149d624 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -70,6 +70,7 @@ public class SegmentAllocationQueue
   private static final Logger log = new Logger(SegmentAllocationQueue.class);
 
   private static final int MAX_QUEUE_SIZE = 2000;
+  private static final int MAX_BATCH_SIZE = 500;
 
   private final long maxWaitTimeMillis;
 
@@ -94,7 +95,7 @@ public class SegmentAllocationQueue
     this.emitter = emitter;
     this.taskLockbox = taskLockbox;
     this.metadataStorage = metadataStorage;
-    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime();
 
     this.executor = taskLockConfig.isBatchSegmentAllocation()
                     ? executorFactory.create(1, "SegmentAllocQueue-%s") : null;
@@ -173,7 +174,7 @@ public class SegmentAllocationQueue
       throw new ISE("Batched segment allocation is disabled.");
     }
 
-    final AllocateRequestKey requestKey = new AllocateRequestKey(request, 
maxWaitTimeMillis);
+    final AllocateRequestKey requestKey = getKeyForAvailableBatch(request);
     final AtomicReference<Future<SegmentIdWithShardSpec>> futureReference = 
new AtomicReference<>();
 
     // Possible race condition:
@@ -198,6 +199,24 @@ public class SegmentAllocationQueue
     return futureReference.get();
   }
 
+  /**
+   * Returns the key for a batch that is not added to the queue yet and/or has
+   * available space. Throws an exception if the queue is already full and no
+   * batch has available capacity.
+   */
+  private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest 
request)
+  {
+    for (int batchIncrementalId = 0; batchIncrementalId < MAX_QUEUE_SIZE; 
++batchIncrementalId) {
+      AllocateRequestKey nextKey = new AllocateRequestKey(request, 
maxWaitTimeMillis, batchIncrementalId);
+      AllocateRequestBatch nextBatch = keyToBatch.get(nextKey);
+      if (nextBatch == null || nextBatch.size() < MAX_BATCH_SIZE) {
+        return nextKey;
+      }
+    }
+
+    throw new ISE("Allocation queue is at capacity, all batches are full.");
+  }
+
   /**
    * Tries to add the given batch to the processing queue. Fails all the 
pending
    * requests in the batch if we are not leader or if the queue is full.
@@ -616,6 +635,11 @@ public class SegmentAllocationQueue
    */
   private static class AllocateRequestKey
   {
+    /**
+     * ID to distinguish between two batches for the same datasource, groupId, 
etc.
+     */
+    private final int batchIncrementalId;
+
     private long queueTimeMillis;
     private final long maxWaitTimeMillis;
 
@@ -635,11 +659,12 @@ public class SegmentAllocationQueue
      * Creates a new key for the given request. The batch for a unique key will
      * always contain a single request.
      */
-    AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis)
+    AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis, 
int batchIncrementalId)
     {
       final SegmentAllocateAction action = request.getAction();
       final Task task = request.getTask();
 
+      this.batchIncrementalId = batchIncrementalId;
       this.dataSource = action.getDataSource();
       this.groupId = task.getGroupId();
       this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck();
@@ -651,10 +676,11 @@ public class SegmentAllocationQueue
                                                .bucket(action.getTimestamp());
 
       this.hash = Objects.hash(
-          skipSegmentLineageCheck,
-          useNonRootGenPartitionSpace,
           dataSource,
           groupId,
+          batchIncrementalId,
+          skipSegmentLineageCheck,
+          useNonRootGenPartitionSpace,
           preferredAllocationInterval,
           lockGranularity
       );
@@ -687,10 +713,11 @@ public class SegmentAllocationQueue
         return false;
       }
       AllocateRequestKey that = (AllocateRequestKey) o;
-      return skipSegmentLineageCheck == that.skipSegmentLineageCheck
-             && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
-             && dataSource.equals(that.dataSource)
+      return dataSource.equals(that.dataSource)
              && groupId.equals(that.groupId)
+             && batchIncrementalId == that.batchIncrementalId
+             && skipSegmentLineageCheck == that.skipSegmentLineageCheck
+             && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
              && 
preferredAllocationInterval.equals(that.preferredAllocationInterval)
              && lockGranularity == that.lockGranularity;
     }
@@ -707,6 +734,7 @@ public class SegmentAllocationQueue
       return "{" +
              "ds='" + dataSource + '\'' +
              ", gr='" + groupId + '\'' +
+             ", incId=" + batchIncrementalId +
              ", lock=" + lockGranularity +
              ", invl=" + preferredAllocationInterval +
              ", slc=" + skipSegmentLineageCheck +
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
index acbc318baa..c860e4e1d1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
@@ -34,7 +34,7 @@ public class TaskLockConfig
   private boolean batchSegmentAllocation = false;
 
   @JsonProperty
-  private long batchAllocationMaxWaitTime = 500L;
+  private long batchAllocationWaitTime = 500L;
 
   public boolean isForceTimeChunkLock()
   {
@@ -46,8 +46,8 @@ public class TaskLockConfig
     return batchSegmentAllocation;
   }
 
-  public long getBatchAllocationMaxWaitTime()
+  public long getBatchAllocationWaitTime()
   {
-    return batchAllocationMaxWaitTime;
+    return batchAllocationWaitTime;
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
index 536e9ffac2..974b3096f9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
@@ -71,7 +71,7 @@ public class SegmentAllocationQueueTest
       }
 
       @Override
-      public long getBatchAllocationMaxWaitTime()
+      public long getBatchAllocationWaitTime()
       {
         return 0;
       }
@@ -249,6 +249,23 @@ public class SegmentAllocationQueueTest
     );
   }
 
+  @Test
+  public void testMaxBatchSize()
+  {
+    for (int i = 0; i < 500; ++i) {
+      SegmentAllocateRequest request =
+          allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build();
+      allocationQueue.add(request);
+    }
+
+    // Verify that next request is added to a new batch
+    Assert.assertEquals(1, allocationQueue.size());
+    SegmentAllocateRequest request =
+        allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build();
+    allocationQueue.add(request);
+    Assert.assertEquals(2, allocationQueue.size());
+  }
+
   @Test
   public void testMultipleRequestsForSameSegment()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index 189d3fe877..eebf78a7dd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -112,7 +112,7 @@ public class TaskActionTestKit extends ExternalResource
       }
 
       @Override
-      public long getBatchAllocationMaxWaitTime()
+      public long getBatchAllocationWaitTime()
       {
         return 10L;
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to