This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 660e6cfa01 Allow for task limit on kill tasks spawned by auto kill 
coordinator duty (#14769)
660e6cfa01 is described below

commit 660e6cfa01b537949303045d3bd5deecaaa5e1c8
Author: zachjsh <[email protected]>
AuthorDate: Tue Aug 8 08:40:55 2023 -0400

    Allow for task limit on kill tasks spawned by auto kill coordinator duty 
(#14769)
    
    ### Description
    
    Previously, the `KillUnusedSegments` coordinator duty, in charge of 
periodically deleting unused segments, could spawn an unlimited number of kill 
tasks for unused segments. This change adds 2 new coordinator dynamic configs 
that can be used to control the limit of tasks spawned by this coordinator duty
    
    `killTaskSlotRatio`: Ratio of total available task slots, including 
autoscaling if applicable that will be allowed for kill tasks. This limit only 
applies for kill tasks that are spawned automatically by the coordinator's auto 
kill duty. Default is 1, which allows all available tasks to be used, which is 
the existing behavior
    
    `maxKillTaskSlots`: Maximum number of tasks that will be allowed for kill 
tasks. This limit only applies for kill tasks that are spawned automatically by 
the coordinator's auto kill duty. Default is INT.MAX, which essentially allows 
for unbounded number of tasks, which is the existing behavior.
    
    Realize that we can effectively get away with just the one 
`killTaskSlotRatio`, but following similarly to the compaction config, which 
has similar properties; I thought it was good to have some control of the upper 
limit regardless of ratio provided.
    
    #### Release note
    NEW: `killTaskSlotRatio`  and `maxKillTaskSlots` coordinator dynamic config 
properties added that allow control of task resource usage spawned by 
`KillUnusedSegments` coordinator task (auto kill)
---
 docs/configuration/index.md                        |  42 ++++----
 .../coordinator/CoordinatorDynamicConfig.java      |  67 ++++++++++++
 .../coordinator/duty/KillUnusedSegments.java       | 115 ++++++++++++++++++++-
 .../coordinator/duty/KillUnusedSegmentsTest.java   | 109 +++++++++++++++++++
 .../server/http/CoordinatorDynamicConfigTest.java  |  88 ++++++++++++++++
 5 files changed, 398 insertions(+), 23 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 4690af390b..6a0d65ae1d 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -934,6 +934,8 @@ A sample Coordinator dynamic config JSON object is shown 
below:
   "replicantLifetime": 15,
   "replicationThrottleLimit": 10,
   "killDataSourceWhitelist": ["wikipedia", "testDatasource"],
+  "killTaskSlotRatio": 0.10,
+  "maxKillTaskSlots": 5,
   "decommissioningNodes": ["localhost:8182", "localhost:8282"],
   "decommissioningMaxPercentOfMaxSegmentsToMove": 70,
   "pauseCoordination": false,
@@ -944,25 +946,27 @@ A sample Coordinator dynamic config JSON object is shown 
below:
 
 Issuing a GET request at the same URL will return the spec that is currently 
in place. A description of the config setup spec is shown below.
 
-|Property|Description|Default|
-|--------|-----------|-------|
-|`millisToWaitBeforeDeleting`|How long does the Coordinator need to be a 
leader before it can start marking overshadowed segments as unused in metadata 
storage.|900000 (15 mins)|
-|`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to 
merge.|524288000L|
-|`mergeSegmentsLimit`|The maximum number of segments that can be in a single 
[append task](../ingestion/tasks.md).|100|
-|`smartSegmentLoading`|Enables ["smart" segment loading 
mode](#smart-segment-loading) which dynamically computes the optimal values of 
several properties that maximize Coordinator performance.|true|
-|`maxSegmentsToMove`|The maximum number of segments that can be moved at any 
given time.|100|
-|`replicantLifetime`|The maximum number of Coordinator runs for which a 
segment can wait in the load queue of a Historical before Druid raises an 
alert.|15|
-|`replicationThrottleLimit`|The maximum number of segment replicas that can be 
assigned to a historical tier in a single Coordinator run. This property 
prevents historicals from becoming overwhelmed when loading extra replicas of 
segments that are already available in the cluster.|500|
-|`balancerComputeThreads`|Thread pool size for computing moving cost of 
segments during segment balancing. Consider increasing this if you have a lot 
of segments and moving segments begins to stall.|1|
-|`killDataSourceWhitelist`|List of specific data sources for which kill tasks 
are sent if property `druid.coordinator.kill.on` is true. This can be a list of 
comma-separated data source names or a JSON array.|none|
-|`killPendingSegmentsSkipList`|List of data sources for which pendingSegments 
are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is 
true. This can be a list of comma-separated data sources or a JSON array.|none|
-|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments allowed in the 
load queue of any given server. Use this parameter to load segments faster if, 
for example, the cluster contains slow-loading nodes or if there are too many 
segments to be replicated to a particular node (when faster loading is 
preferred to better segments distribution). The optimal value depends on the 
loading speed of segments, acceptable replication time and number of nodes. 
|500|
-|`useRoundRobinSegmentAssignment`|Boolean flag for whether segments should be 
assigned to historicals in a round robin fashion. When disabled, segment 
assignment is done using the chosen balancer strategy. When enabled, this can 
speed up segment assignments leaving balancing to move the segments to their 
optimal locations (based on the balancer strategy) lazily. |true|
-|`decommissioningNodes`| List of historical servers to 'decommission'. 
Coordinator will not assign new segments to 'decommissioning' servers,  and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by 
`decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
-|`decommissioningMaxPercentOfMaxSegmentsToMove`| Upper limit of segments the 
Coordinator can move from decommissioning servers to active non-decommissioning 
servers during a single run. This value is relative to the total maximum number 
of segments that can be moved at any given time based upon the value of 
`maxSegmentsToMove`.<br /><br />If 
`decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not 
move segments to decommissioning servers, effectively putting them in  [...]
-|`pauseCoordination`| Boolean flag for whether or not the coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
Interface. Such duties include: Segment balancing, Segment compaction, 
Submitting kill tasks for unused segments (if enabled), Logging of used 
segments in the cluster, Marking of n [...]
-|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow historicals 
in the cluster. However, the slow historical may still load the segment later 
and the coordinator may iss [...]
-|`maxNonPrimaryReplicantsToLoad`|The maximum number of replicas that can be 
assigned across all tiers in a single Coordinator run. This parameter serves 
the same purpose as `replicationThrottleLimit` except this limit applies at the 
cluster-level instead of per tier. The default value does not apply a limit to 
the number of replicas assigned per coordination cycle. If you want to use a 
non-default value for this property, you may want to start with `~20%` of the 
number of segments found  [...]
+|Property| Description                                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|--------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+|`millisToWaitBeforeDeleting`| How long does the Coordinator need to be a 
leader before it can start marking overshadowed segments as unused in metadata 
storage.                                                                        
                                                                                
                                                                                
                                                                                
                    [...]
+|`mergeBytesLimit`| The maximum total uncompressed size in bytes of segments 
to merge.                                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+|`mergeSegmentsLimit`| The maximum number of segments that can be in a single 
[append task](../ingestion/tasks.md).                                           
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
+|`smartSegmentLoading`| Enables ["smart" segment loading 
mode](#smart-segment-loading) which dynamically computes the optimal values of 
several properties that maximize Coordinator performance.                       
                                                                                
                                                                                
                                                                                
                                     [...]
+|`maxSegmentsToMove`| The maximum number of segments that can be moved at any 
given time.                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
+|`replicantLifetime`| The maximum number of Coordinator runs for which a 
segment can wait in the load queue of a Historical before Druid raises an 
alert.                                                                          
                                                                                
                                                                                
                                                                                
                          [...]
+|`replicationThrottleLimit`| The maximum number of segment replicas that can 
be assigned to a historical tier in a single Coordinator run. This property 
prevents historicals from becoming overwhelmed when loading extra replicas of 
segments that are already available in the cluster.                             
                                                                                
                                                                                
                      [...]
+|`balancerComputeThreads`| Thread pool size for computing moving cost of 
segments during segment balancing. Consider increasing this if you have a lot 
of segments and moving segments begins to stall.                                
                                                                                
                                                                                
                                                                                
                      [...]
+|`killDataSourceWhitelist`| List of specific data sources for which kill tasks 
are sent if property `druid.coordinator.kill.on` is true. This can be a list of 
comma-separated data source names or a JSON array.                              
                                                                                
                                                                                
                                                                                
              [...]
+|`killTaskSlotRatio`| Ratio of total available task slots, including 
autoscaling if applicable that will be allowed for kill tasks. This limit only 
applies for kill tasks that are spawned automatically by the coordinator's auto 
kill duty, which is enabled when `druid.coordinator.kill.on` is true.           
                                                                                
                                                                                
                         [...]
+|`maxKillTaskSlots`| Maximum number of tasks that will be allowed for kill 
tasks. This limit only applies for kill tasks that are spawned automatically by 
the coordinator's auto kill duty, which is enabled when 
`druid.coordinator.kill.on` is true.                                            
                                                                                
                                                                                
                                          [...]
+|`killPendingSegmentsSkipList`| List of data sources for which pendingSegments 
are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is 
true. This can be a list of comma-separated data sources or a JSON array.       
                                                                                
                                                                                
                                                                                
              [...]
+|`maxSegmentsInNodeLoadingQueue`| The maximum number of segments allowed in 
the load queue of any given server. Use this parameter to load segments faster 
if, for example, the cluster contains slow-loading nodes or if there are too 
many segments to be replicated to a particular node (when faster loading is 
preferred to better segments distribution). The optimal value depends on the 
loading speed of segments, acceptable replication time and number of nodes.     
                            [...]
+|`useRoundRobinSegmentAssignment`| Boolean flag for whether segments should be 
assigned to historicals in a round robin fashion. When disabled, segment 
assignment is done using the chosen balancer strategy. When enabled, this can 
speed up segment assignments leaving balancing to move the segments to their 
optimal locations (based on the balancer strategy) lazily.                      
                                                                                
                          [...]
+|`decommissioningNodes`| List of historical servers to 'decommission'. 
Coordinator will not assign new segments to 'decommissioning' servers,  and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by 
`decommissioningMaxPercentOfMaxSegmentsToMove`.                                 
                                                                                
                                                                       [...]
+|`decommissioningMaxPercentOfMaxSegmentsToMove`| Upper limit of segments the 
Coordinator can move from decommissioning servers to active non-decommissioning 
servers during a single run. This value is relative to the total maximum number 
of segments that can be moved at any given time based upon the value of 
`maxSegmentsToMove`.<br /><br />If 
`decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not 
move segments to decommissioning servers, effectively putting them in  [...]
+|`pauseCoordination`| Boolean flag for whether or not the coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
Interface. Such duties include: Segment balancing, Segment compaction, 
Submitting kill tasks for unused segments (if enabled), Logging of used 
segments in the cluster, Marking of n [...]
+|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow historicals 
in the cluster. However, the slow historical may still load the segment later 
and the coordinator may iss [...]
+|`maxNonPrimaryReplicantsToLoad`| The maximum number of replicas that can be 
assigned across all tiers in a single Coordinator run. This parameter serves 
the same purpose as `replicationThrottleLimit` except this limit applies at the 
cluster-level instead of per tier. The default value does not apply a limit to 
the number of replicas assigned per coordination cycle. If you want to use a 
non-default value for this property, you may want to start with `~20%` of the 
number of segments found [...]
 
 ##### Smart segment loading
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 9a38029403..8135974368 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
 import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
@@ -69,6 +70,9 @@ public class CoordinatorDynamicConfig
    * List of specific data sources for which kill tasks are sent in {@link 
KillUnusedSegments}.
    */
   private final Set<String> specificDataSourcesToKillUnusedSegmentsIn;
+
+  private final double killTaskSlotRatio;
+  private final int maxKillTaskSlots;
   private final Set<String> decommissioningNodes;
 
   private final Map<String, String> debugDimensions;
@@ -130,6 +134,8 @@ public class CoordinatorDynamicConfig
       // Keeping the legacy 'killDataSourceWhitelist' property name for 
backward compatibility. When the project is
       // updated to Jackson 2.9 it could be changed, see 
https://github.com/apache/druid/issues/7152
       @JsonProperty("killDataSourceWhitelist") Object 
specificDataSourcesToKillUnusedSegmentsIn,
+      @JsonProperty("killTaskSlotRatio") @Nullable Double killTaskSlotRatio,
+      @JsonProperty("maxKillTaskSlots") @Nullable Integer maxKillTaskSlots,
       // Type is Object here so that we can support both string and list as 
Coordinator console can not send array of
       // strings in the update request, as well as for 
specificDataSourcesToKillUnusedSegmentsIn.
       // Keeping the legacy 'killPendingSegmentsSkipList' property name for 
backward compatibility. When the project is
@@ -158,6 +164,20 @@ public class CoordinatorDynamicConfig
     this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
     this.specificDataSourcesToKillUnusedSegmentsIn
         = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
+    if (null != killTaskSlotRatio && (killTaskSlotRatio < 0 || 
killTaskSlotRatio > 1)) {
+      throw InvalidInput.exception(
+          "killTaskSlotRatio [%.2f] is invalid. It must be >= 0 and <= 1.",
+          killTaskSlotRatio
+      );
+    }
+    this.killTaskSlotRatio = killTaskSlotRatio != null ? killTaskSlotRatio : 
Defaults.KILL_TASK_SLOT_RATIO;
+    if (null != maxKillTaskSlots && maxKillTaskSlots < 0) {
+      throw InvalidInput.exception(
+          "maxKillTaskSlots [%d] is invalid. It must be >= 0.",
+          maxKillTaskSlots
+      );
+    }
+    this.maxKillTaskSlots = maxKillTaskSlots != null ? maxKillTaskSlots : 
Defaults.MAX_KILL_TASK_SLOTS;
     this.dataSourcesToNotKillStalePendingSegmentsIn
         = parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
     this.maxSegmentsInNodeLoadingQueue = Builder.valueOrDefault(
@@ -297,6 +317,18 @@ public class CoordinatorDynamicConfig
     return specificDataSourcesToKillUnusedSegmentsIn;
   }
 
+  @JsonProperty("killTaskSlotRatio")
+  public double getKillTaskSlotRatio()
+  {
+    return killTaskSlotRatio;
+  }
+
+  @JsonProperty("maxKillTaskSlots")
+  public int getMaxKillTaskSlots()
+  {
+    return maxKillTaskSlots;
+  }
+
   @JsonIgnore
   public boolean isKillUnusedSegmentsInAllDataSources()
   {
@@ -406,6 +438,8 @@ public class CoordinatorDynamicConfig
            ", replicationThrottleLimit=" + replicationThrottleLimit +
            ", balancerComputeThreads=" + balancerComputeThreads +
            ", specificDataSourcesToKillUnusedSegmentsIn=" + 
specificDataSourcesToKillUnusedSegmentsIn +
+           ", killTaskSlotRatio=" + killTaskSlotRatio +
+           ", maxKillTaskSlots=" + maxKillTaskSlots +
            ", dataSourcesToNotKillStalePendingSegmentsIn=" + 
dataSourcesToNotKillStalePendingSegmentsIn +
            ", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
            ", decommissioningNodes=" + decommissioningNodes +
@@ -444,6 +478,8 @@ public class CoordinatorDynamicConfig
            && Objects.equals(
                specificDataSourcesToKillUnusedSegmentsIn,
                that.specificDataSourcesToKillUnusedSegmentsIn)
+           && Objects.equals(killTaskSlotRatio, that.killTaskSlotRatio)
+           && Objects.equals(maxKillTaskSlots, that.maxKillTaskSlots)
            && Objects.equals(
                dataSourcesToNotKillStalePendingSegmentsIn,
                that.dataSourcesToNotKillStalePendingSegmentsIn)
@@ -464,6 +500,8 @@ public class CoordinatorDynamicConfig
         balancerComputeThreads,
         maxSegmentsInNodeLoadingQueue,
         specificDataSourcesToKillUnusedSegmentsIn,
+        killTaskSlotRatio,
+        maxKillTaskSlots,
         dataSourcesToNotKillStalePendingSegmentsIn,
         decommissioningNodes,
         decommissioningMaxPercentOfMaxSegmentsToMove,
@@ -495,6 +533,13 @@ public class CoordinatorDynamicConfig
     static final int MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE;
     static final boolean USE_ROUND_ROBIN_ASSIGNMENT = true;
     static final boolean SMART_SEGMENT_LOADING = true;
+
+    // The following default values for killTaskSlotRatio and maxKillTaskSlots
+    // are to preserve the behavior before Druid 0.28 and a future version may
+    // want to consider better defaults so that kill tasks can not eat up all
+    // the capacity in the cluster would be nice
+    static final double KILL_TASK_SLOT_RATIO = 1.0;
+    static final int MAX_KILL_TASK_SLOTS = Integer.MAX_VALUE;
   }
 
   public static class Builder
@@ -507,6 +552,8 @@ public class CoordinatorDynamicConfig
     private Integer replicationThrottleLimit;
     private Integer balancerComputeThreads;
     private Object specificDataSourcesToKillUnusedSegmentsIn;
+    private Double killTaskSlotRatio;
+    private Integer maxKillTaskSlots;
     private Object dataSourcesToNotKillStalePendingSegmentsIn;
     private Integer maxSegmentsInNodeLoadingQueue;
     private Object decommissioningNodes;
@@ -532,6 +579,8 @@ public class CoordinatorDynamicConfig
         @JsonProperty("replicationThrottleLimit") @Nullable Integer 
replicationThrottleLimit,
         @JsonProperty("balancerComputeThreads") @Nullable Integer 
balancerComputeThreads,
         @JsonProperty("killDataSourceWhitelist") @Nullable Object 
specificDataSourcesToKillUnusedSegmentsIn,
+        @JsonProperty("killTaskSlotRatio") @Nullable Double killTaskSlotRatio,
+        @JsonProperty("maxKillTaskSlots") @Nullable Integer maxKillTaskSlots,
         @JsonProperty("killPendingSegmentsSkipList") @Nullable Object 
dataSourcesToNotKillStalePendingSegmentsIn,
         @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer 
maxSegmentsInNodeLoadingQueue,
         @JsonProperty("decommissioningNodes") @Nullable Object 
decommissioningNodes,
@@ -553,6 +602,8 @@ public class CoordinatorDynamicConfig
       this.replicationThrottleLimit = replicationThrottleLimit;
       this.balancerComputeThreads = balancerComputeThreads;
       this.specificDataSourcesToKillUnusedSegmentsIn = 
specificDataSourcesToKillUnusedSegmentsIn;
+      this.killTaskSlotRatio = killTaskSlotRatio;
+      this.maxKillTaskSlots = maxKillTaskSlots;
       this.dataSourcesToNotKillStalePendingSegmentsIn = 
dataSourcesToNotKillStalePendingSegmentsIn;
       this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
       this.decommissioningNodes = decommissioningNodes;
@@ -625,6 +676,18 @@ public class CoordinatorDynamicConfig
       return this;
     }
 
+    public Builder withKillTaskSlotRatio(Double killTaskSlotRatio)
+    {
+      this.killTaskSlotRatio = killTaskSlotRatio;
+      return this;
+    }
+
+    public Builder withMaxKillTaskSlots(Integer maxKillTaskSlots)
+    {
+      this.maxKillTaskSlots = maxKillTaskSlots;
+      return this;
+    }
+
     public Builder withMaxSegmentsInNodeLoadingQueue(int 
maxSegmentsInNodeLoadingQueue)
     {
       this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
@@ -685,6 +748,8 @@ public class CoordinatorDynamicConfig
           valueOrDefault(replicationThrottleLimit, 
Defaults.REPLICATION_THROTTLE_LIMIT),
           valueOrDefault(balancerComputeThreads, 
Defaults.BALANCER_COMPUTE_THREADS),
           specificDataSourcesToKillUnusedSegmentsIn,
+          valueOrDefault(killTaskSlotRatio, Defaults.KILL_TASK_SLOT_RATIO),
+          valueOrDefault(maxKillTaskSlots, Defaults.MAX_KILL_TASK_SLOTS),
           dataSourcesToNotKillStalePendingSegmentsIn,
           valueOrDefault(maxSegmentsInNodeLoadingQueue, 
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
           decommissioningNodes,
@@ -720,6 +785,8 @@ public class CoordinatorDynamicConfig
           valueOrDefault(replicationThrottleLimit, 
defaults.getReplicationThrottleLimit()),
           valueOrDefault(balancerComputeThreads, 
defaults.getBalancerComputeThreads()),
           valueOrDefault(specificDataSourcesToKillUnusedSegmentsIn, 
defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()),
+          valueOrDefault(killTaskSlotRatio, defaults.killTaskSlotRatio),
+          valueOrDefault(maxKillTaskSlots, defaults.maxKillTaskSlots),
           valueOrDefault(dataSourcesToNotKillStalePendingSegmentsIn, 
defaults.getDataSourcesToNotKillStalePendingSegmentsIn()),
           valueOrDefault(maxSegmentsInNodeLoadingQueue, 
defaults.getMaxSegmentsInNodeLoadingQueue()),
           valueOrDefault(decommissioningNodes, 
defaults.getDecommissioningNodes()),
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 205947b003..97bd2ab388 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -19,24 +19,34 @@
 
 package org.apache.druid.server.coordinator.duty;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.rpc.HttpResponseException;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.utils.CollectionUtils;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Completely removes information about unused segments who have an interval 
end that comes before
@@ -49,6 +59,8 @@ import java.util.List;
  */
 public class KillUnusedSegments implements CoordinatorDuty
 {
+  public static final String KILL_TASK_TYPE = "kill";
+  public static final String TASK_ID_PREFIX = "coordinator-issued";
   private static final Logger log = new Logger(KillUnusedSegments.class);
 
   private final long period;
@@ -102,6 +114,13 @@ public class KillUnusedSegments implements CoordinatorDuty
   {
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
+    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
+    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
+    int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskSlotRatio, 
maxKillTaskSlots);
+    if (0 == availableKillTaskSlots) {
+      log.debug("Not killing any unused segments because there are no 
available kill task slots at this time.");
+      return params;
+    }
 
     // If no datasource has been specified, all are eligible for killing 
unused segments
     if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
@@ -116,16 +135,22 @@ public class KillUnusedSegments implements CoordinatorDuty
     } else {
       log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
       lastKillTime = currentTimeMillis;
-      killUnusedSegments(dataSourcesToKill);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
     }
 
     return params;
   }
 
-  private void killUnusedSegments(Collection<String> dataSourcesToKill)
+  private void killUnusedSegments(Collection<String> dataSourcesToKill, int 
availableKillTaskSlots)
   {
     int submittedTasks = 0;
     for (String dataSource : dataSourcesToKill) {
+      if (submittedTasks >= availableKillTaskSlots) {
+        log.info(StringUtils.format(
+            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
+            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
+        break;
+      }
       final Interval intervalToKill = findIntervalForKill(dataSource);
       if (intervalToKill == null) {
         continue;
@@ -133,7 +158,7 @@ public class KillUnusedSegments implements CoordinatorDuty
 
       try {
         FutureUtils.getUnchecked(overlordClient.runKillTask(
-            "coordinator-issued",
+            TASK_ID_PREFIX,
             dataSource,
             intervalToKill,
             maxSegmentsToKill
@@ -149,7 +174,7 @@ public class KillUnusedSegments implements CoordinatorDuty
       }
     }
 
-    log.debug("Submitted kill tasks for [%d] datasources.", submittedTasks);
+    log.debug("Submitted [%d] kill tasks for [%d] datasources.", 
submittedTasks, dataSourcesToKill.size());
   }
 
   /**
@@ -174,4 +199,86 @@ public class KillUnusedSegments implements CoordinatorDuty
     }
   }
 
+  private int getAvailableKillTaskSlots(double killTaskSlotRatio, int 
maxKillTaskSlots)
+  {
+    return Math.max(
+        0,
+        getKillTaskCapacity(getTotalWorkerCapacity(), killTaskSlotRatio, 
maxKillTaskSlots) - getNumActiveKillTaskSlots()
+    );
+  }
+
+  /**
+   * Get the number of active kill task slots in use. The kill tasks counted, 
are only those thare are submitted
+   * by this coordinator duty (have prefix {@link 
KillUnusedSegments#TASK_ID_PREFIX}. The value returned here
+   * may be an overestimate, as in some cased the taskType can be null if 
middleManagers are running with an older
+   * version, and these tasks are counted as active kill tasks to be safe.
+   * @return
+   */
+  private int getNumActiveKillTaskSlots()
+  {
+    final CloseableIterator<TaskStatusPlus> activeTasks =
+        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
+    // Fetch currently running kill tasks
+    int numActiveKillTasks = 0;
+
+    try (final Closer closer = Closer.create()) {
+      closer.register(activeTasks);
+      while (activeTasks.hasNext()) {
+        final TaskStatusPlus status = activeTasks.next();
+
+        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard
+        // the tasks of the unknown taskType as the killTask. This is because 
it's important to not run
+        // killTasks more than the configured limit at any time which might 
impact to the ingestion
+        // performance.
+        if (status.getType() == null
+            || (KILL_TASK_TYPE.equals(status.getType()) && 
status.getId().startsWith(TASK_ID_PREFIX))) {
+          numActiveKillTasks++;
+        }
+      }
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return numActiveKillTasks;
+  }
+
+  private int getTotalWorkerCapacity()
+  {
+    int totalWorkerCapacity;
+    try {
+      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
+          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
+      totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
+      if (totalWorkerCapacity < 0) {
+        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
+      }
+    }
+    catch (ExecutionException e) {
+      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.
+      if (e.getCause() instanceof HttpResponseException
+          && ((HttpResponseException) 
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+        log.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. 
Falling back to getWorkers.");
+        totalWorkerCapacity =
+            FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
+                .stream()
+                .mapToInt(worker -> worker.getWorker().getCapacity())
+                .sum();
+      } else {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+
+    return totalWorkerCapacity;
+  }
+
+  @VisibleForTesting
+  static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
+  {
+    return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 
1.0)), maxKillTaskSlots);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 039174eac7..e67063fb7b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -20,6 +20,13 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.rpc.indexing.OverlordClient;
@@ -32,6 +39,7 @@ import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.joda.time.Period;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -143,6 +151,7 @@ public class KillUnusedSegmentsTest
         ArgumentMatchers.anyInt()
     );
 
+    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     target.run(params);
     Mockito.verify(overlordClient, Mockito.never())
            .runKillTask(anyString(), anyString(), any(Interval.class));
@@ -156,6 +165,7 @@ public class KillUnusedSegmentsTest
     target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, 
config);
 
     // No unused segment is older than the retention period
+    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     target.run(params);
     Mockito.verify(overlordClient, Mockito.never())
            .runKillTask(anyString(), anyString(), any(Interval.class));
@@ -169,6 +179,7 @@ public class KillUnusedSegmentsTest
         yearOldSegment.getInterval().getStart(),
         dayOldSegment.getInterval().getEnd()
     );
+    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     runAndVerifyKillInterval(expectedKillInterval);
   }
 
@@ -185,6 +196,7 @@ public class KillUnusedSegmentsTest
         yearOldSegment.getInterval().getStart(),
         nextDaySegment.getInterval().getEnd()
     );
+    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     runAndVerifyKillInterval(expectedKillInterval);
   }
 
@@ -200,6 +212,7 @@ public class KillUnusedSegmentsTest
         yearOldSegment.getInterval().getStart(),
         nextMonthSegment.getInterval().getEnd()
     );
+    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     runAndVerifyKillInterval(expectedKillInterval);
   }
 
@@ -210,10 +223,59 @@ public class KillUnusedSegmentsTest
            .when(config).getCoordinatorKillMaxSegments();
     target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, 
config);
 
+    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     // Only 1 unused segment is killed
     runAndVerifyKillInterval(yearOldSegment.getInterval());
   }
 
+  @Test
+  public void testKillTaskSlotRatioNoAvailableTaskCapacityForKill()
+  {
+    mockTaskSlotUsage(0.10, 10, 1, 5);
+    runAndVerifyNoKill();
+  }
+
+  @Test
+  public void testMaxKillTaskSlotsNoAvailableTaskCapacityForKill()
+  {
+    mockTaskSlotUsage(1.0, 3, 3, 10);
+    runAndVerifyNoKill();
+  }
+
+  @Test
+  public void testGetKillTaskCapacity()
+  {
+    Assert.assertEquals(
+        10,
+        KillUnusedSegments.getKillTaskCapacity(10, 1.0, Integer.MAX_VALUE)
+    );
+
+    Assert.assertEquals(
+        0,
+        KillUnusedSegments.getKillTaskCapacity(10, 0.0, Integer.MAX_VALUE)
+    );
+
+    Assert.assertEquals(
+        10,
+        KillUnusedSegments.getKillTaskCapacity(10, Double.POSITIVE_INFINITY, 
Integer.MAX_VALUE)
+    );
+
+    Assert.assertEquals(
+        0,
+        KillUnusedSegments.getKillTaskCapacity(10, 1.0, 0)
+    );
+
+    Assert.assertEquals(
+        1,
+        KillUnusedSegments.getKillTaskCapacity(10, 0.1, 3)
+    );
+
+    Assert.assertEquals(
+        2,
+        KillUnusedSegments.getKillTaskCapacity(10, 0.3, 2)
+    );
+  }
+
   private void runAndVerifyKillInterval(Interval expectedKillInterval)
   {
     int limit = config.getCoordinatorKillMaxSegments();
@@ -226,6 +288,53 @@ public class KillUnusedSegmentsTest
     );
   }
 
+  private void runAndVerifyNoKill()
+  {
+    target.run(params);
+    Mockito.verify(overlordClient, Mockito.never()).runKillTask(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(Interval.class),
+        ArgumentMatchers.anyInt()
+    );
+  }
+
+  private void mockTaskSlotUsage(
+      double killTaskSlotRatio,
+      int maxKillTaskSlots,
+      int numPendingCoordKillTasks,
+      int maxWorkerCapacity
+  )
+  {
+    Mockito.doReturn(killTaskSlotRatio)
+        .when(coordinatorDynamicConfig).getKillTaskSlotRatio();
+    Mockito.doReturn(maxKillTaskSlots)
+        .when(coordinatorDynamicConfig).getMaxKillTaskSlots();
+    Mockito.doReturn(Futures.immediateFuture(new 
IndexingTotalWorkerCapacityInfo(1, maxWorkerCapacity)))
+        .when(overlordClient)
+        .getTotalWorkerCapacity();
+    List<TaskStatusPlus> runningCoordinatorIssuedKillTasks = new ArrayList<>();
+    for (int i = 0; i < numPendingCoordKillTasks; i++) {
+      runningCoordinatorIssuedKillTasks.add(new TaskStatusPlus(
+          KillUnusedSegments.TASK_ID_PREFIX + "_taskId_" + i,
+          "groupId_" + i,
+          KillUnusedSegments.KILL_TASK_TYPE,
+          DateTimes.EPOCH,
+          DateTimes.EPOCH,
+          TaskState.RUNNING,
+          RunnerTaskState.RUNNING,
+          -1L,
+          TaskLocation.unknown(),
+          "datasource",
+          null
+      ));
+    }
+    Mockito.doReturn(Futures.immediateFuture(
+            
CloseableIterators.withEmptyBaggage(runningCoordinatorIssuedKillTasks.iterator())))
+        .when(overlordClient)
+        .taskStatuses(null, null, 0);
+  }
+
   private DataSegment createSegmentWithEnd(DateTime endTime)
   {
     return new DataSegment(
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index a7744256d5..d7bac78e10 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -27,6 +27,8 @@ import 
org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Set;
 
 /**
@@ -50,6 +52,8 @@ public class CoordinatorDynamicConfigTest
                      + "  \"replicationThrottleLimit\": 1,\n"
                      + "  \"balancerComputeThreads\": 2, \n"
                      + "  \"killDataSourceWhitelist\": 
[\"test1\",\"test2\"],\n"
+                     + "  \"killTaskSlotRatio\": 0.15,\n"
+                     + "  \"maxKillTaskSlots\": 2,\n"
                      + "  \"maxSegmentsInNodeLoadingQueue\": 1,\n"
                      + "  \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
                      + "  \"decommissioningMaxPercentOfMaxSegmentsToMove\": 
9,\n"
@@ -79,6 +83,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        0.15,
+        2,
         false,
         1,
         decommissioning,
@@ -99,6 +105,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        0.15,
+        2,
         false,
         1,
         ImmutableSet.of("host1"),
@@ -119,6 +127,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        0.15,
+        2,
         false,
         1,
         ImmutableSet.of("host1"),
@@ -139,6 +149,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        0.15,
+        2,
         false,
         1,
         ImmutableSet.of("host1"),
@@ -159,6 +171,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        0.15,
+        2,
         false,
         1,
         ImmutableSet.of("host1"),
@@ -179,6 +193,52 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        0.15,
+        2,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        true,
+        10
+    );
+
+    actual = 
CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(1.0).build(actual);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        whitelist,
+        1.0,
+        2,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        true,
+        10
+    );
+
+    actual = 
CoordinatorDynamicConfig.builder().withMaxKillTaskSlots(5).build(actual);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        whitelist,
+        1.0,
+        5,
         false,
         1,
         ImmutableSet.of("host1"),
@@ -216,6 +276,8 @@ public class CoordinatorDynamicConfigTest
         null,
         null,
         null,
+        null,
+        null,
         ImmutableSet.of("host1"),
         5,
         true,
@@ -243,6 +305,8 @@ public class CoordinatorDynamicConfigTest
         ImmutableSet.of("test1"),
         null,
         null,
+        null,
+        null,
         ImmutableSet.of("host1"),
         5,
         true,
@@ -292,6 +356,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        1.0,
+        Integer.MAX_VALUE,
         false,
         1,
         decommissioning,
@@ -312,6 +378,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        1.0,
+        Integer.MAX_VALUE,
         false,
         1,
         ImmutableSet.of("host1"),
@@ -332,6 +400,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        1.0,
+        Integer.MAX_VALUE,
         false,
         1,
         ImmutableSet.of("host1"),
@@ -376,6 +446,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         ImmutableSet.of("test1", "test2"),
+        1.0,
+        Integer.MAX_VALUE,
         false,
         1,
         ImmutableSet.of(),
@@ -435,6 +507,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         whitelist,
+        1.0,
+        Integer.MAX_VALUE,
         false,
         1,
         decommissioning,
@@ -477,6 +551,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         ImmutableSet.of(),
+        1.0,
+        Integer.MAX_VALUE,
         true,
         1,
         ImmutableSet.of(),
@@ -530,6 +606,8 @@ public class CoordinatorDynamicConfigTest
         1,
         2,
         ImmutableSet.of(),
+        1.0,
+        Integer.MAX_VALUE,
         true,
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
         ImmutableSet.of(),
@@ -555,6 +633,8 @@ public class CoordinatorDynamicConfigTest
         500,
         1,
         emptyList,
+        1.0,
+        Integer.MAX_VALUE,
         true,
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
         emptyList,
@@ -583,6 +663,8 @@ public class CoordinatorDynamicConfigTest
         500,
         1,
         ImmutableSet.of("DATASOURCE"),
+        1.0,
+        Integer.MAX_VALUE,
         false,
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
         ImmutableSet.of(),
@@ -621,6 +703,8 @@ public class CoordinatorDynamicConfigTest
             null,
             null,
             null,
+            null,
+            null,
             null
         ).build(current)
     );
@@ -670,6 +754,8 @@ public class CoordinatorDynamicConfigTest
       int expectedReplicationThrottleLimit,
       int expectedBalancerComputeThreads,
       Set<String> expectedSpecificDataSourcesToKillUnusedSegmentsIn,
+      Double expectedKillTaskSlotRatio,
+      @Nullable Integer expectedMaxKillTaskSlots,
       boolean expectedKillUnusedSegmentsInAllDataSources,
       int expectedMaxSegmentsInNodeLoadingQueue,
       Set<String> decommissioningNodes,
@@ -694,6 +780,8 @@ public class CoordinatorDynamicConfigTest
         config.getSpecificDataSourcesToKillUnusedSegmentsIn()
     );
     Assert.assertEquals(expectedKillUnusedSegmentsInAllDataSources, 
config.isKillUnusedSegmentsInAllDataSources());
+    Assert.assertEquals(expectedKillTaskSlotRatio, 
config.getKillTaskSlotRatio(), 0.001);
+    Assert.assertEquals((int) expectedMaxKillTaskSlots, 
config.getMaxKillTaskSlots());
     Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, 
config.getMaxSegmentsInNodeLoadingQueue());
     Assert.assertEquals(decommissioningNodes, 
config.getDecommissioningNodes());
     Assert.assertEquals(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to